文章推薦系統(tǒng) | 六、構(gòu)建離線用戶(hù)畫(huà)像

推薦閱讀:
文章推薦系統(tǒng) | 一、推薦流程設(shè)計(jì)
文章推薦系統(tǒng) | 二、同步業(yè)務(wù)數(shù)據(jù)
文章推薦系統(tǒng) | 三、收集用戶(hù)行為數(shù)據(jù)
文章推薦系統(tǒng) | 四、構(gòu)建離線文章畫(huà)像
文章推薦系統(tǒng) | 五、計(jì)算文章相似度

前面我們完成了文章畫(huà)像的構(gòu)建以及文章相似度的計(jì)算,接下來(lái),我們就要實(shí)現(xiàn)用戶(hù)畫(huà)像的構(gòu)建了。用戶(hù)畫(huà)像往往是大型網(wǎng)站的重要模塊,基于用戶(hù)畫(huà)像不僅可以實(shí)現(xiàn)個(gè)性化推薦,還可以實(shí)現(xiàn)用戶(hù)分群、精準(zhǔn)推送、精準(zhǔn)營(yíng)銷(xiāo)以及用戶(hù)行為預(yù)測(cè)、商業(yè)化轉(zhuǎn)化分析等,為商業(yè)決策提供數(shù)據(jù)支持。通常用戶(hù)畫(huà)像包括用戶(hù)屬性信息(性別、年齡、出生日期等)、用戶(hù)行為信息(瀏覽、收藏、點(diǎn)贊等)以及環(huán)境信息(時(shí)間、地理位置等)。

處理用戶(hù)行為數(shù)據(jù)

在數(shù)據(jù)準(zhǔn)備階段,我們通過(guò) Flume 已經(jīng)可以將用戶(hù)行為數(shù)據(jù)收集到 Hive 的 user_action 表的 HDFS 路徑中,先來(lái)看一下這些數(shù)據(jù)長(zhǎng)什么樣子,我們讀取當(dāng)天的用戶(hù)行為數(shù)據(jù),注意讀取之前要先關(guān)聯(lián)分區(qū)

_day = time.strftime("%Y-%m-%d", time.localtime())
_localions = '/user/hive/warehouse/profile.db/user_action/' + _day
if fs.exists(_localions):
    # 如果有該文件直接關(guān)聯(lián),捕獲關(guān)聯(lián)重復(fù)異常
    try:
        self.spark.sql("alter table user_action add partition (dt='%s') location '%s'" % (_day, _localions))
    except Exception as e:
        pass

    self.spark.sql("use profile")
    user_action = self.spark.sql("select actionTime, readTime, channelId, param.articleId, param.algorithmCombine, param.action, param.userId from user_action where dt>=" + _day)

user_action 結(jié)果如下所示

可以發(fā)現(xiàn),上面的一條記錄代表用戶(hù)對(duì)文章的一次行為,但通常我們需要查詢(xún)某個(gè)用戶(hù)對(duì)某篇文章的所有行為,所以,我們要將這里用戶(hù)對(duì)文章的多條行為數(shù)據(jù)合并為一條,其中包括用戶(hù)對(duì)文章的所有行為。我們需要新建一個(gè) Hive 表 user_article_basic,這張表包括了用戶(hù) ID、文章 ID、是否曝光、是否點(diǎn)擊、閱讀時(shí)間等等,隨后我們將處理好的用戶(hù)行為數(shù)據(jù)存儲(chǔ)到此表中

create table user_article_basic
(
    user_id     BIGINT comment "userID",
    action_time STRING comment "user actions time",
    article_id  BIGINT comment "articleid",
    channel_id  INT comment "channel_id",
    shared      BOOLEAN comment "is shared",
    clicked     BOOLEAN comment "is clicked",
    collected   BOOLEAN comment "is collected",
    exposure    BOOLEAN comment "is exposured",
    read_time   STRING comment "reading time"
)
    COMMENT "user_article_basic"
    CLUSTERED by (user_id) into 2 buckets
    STORED as textfile
    LOCATION '/user/hive/warehouse/profile.db/user_article_basic';

遍歷每一條原始用戶(hù)行為數(shù)據(jù),判斷用戶(hù)對(duì)文章的行為,在 user_action_basic 中將該用戶(hù)與該文章對(duì)應(yīng)的行為設(shè)置為 True

if user_action.collect():
    def _generate(row):
        _list = []
        if row.action == 'exposure':
            for article_id in eval(row.articleId):
                # ["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"]
                _list.append(
                    [row.userId, row.actionTime, article_id, row.channelId, False, False, False, True, row.readTime])
            return _list
        else:
            class Temp(object):
                shared = False
                clicked = False
                collected = False
                read_time = ""

            _tp = Temp()
            if row.action == 'click':
                _tp.clicked = True
            elif row.action == 'share':
                _tp.shared = True
            elif row.action == 'collect':
                _tp.collected = True
            elif row.action == 'read':
                _tp.clicked = True

            _list.append(
                [row.userId, row.actionTime, int(row.articleId), row.channelId, _tp.shared, _tp.clicked, _tp.collected,
                 True, row.readTime])
            return _list

    user_action_basic = user_action.rdd.flatMap(_generate)
    user_action_basic = user_action_basic.toDF(
        ["user_id", "action_time", "article_id", "channel_id", "shared", "clicked", "collected", "exposure",
         "read_time"])

user_action_basic 結(jié)果如下所示,這里的一條記錄包括了某個(gè)用戶(hù)對(duì)某篇文章的所有行為

由于 Hive 目前還不支持 pyspark 的原子性操作,所以 user_article_basic 表的用戶(hù)行為數(shù)據(jù)只能全量更新(實(shí)際場(chǎng)景中可以選擇其他語(yǔ)言或數(shù)據(jù)庫(kù)實(shí)現(xiàn))。這里,我們需要將當(dāng)天的用戶(hù)行為與 user_action_basic 的歷史用戶(hù)行為進(jìn)行合并

old_data = uup.spark.sql("select * from user_article_basic")
new_data = old_data.unionAll(user_action_basic)

合并后又會(huì)產(chǎn)生一個(gè)新的問(wèn)題,那就是用戶(hù) ID 和文章 ID 可能重復(fù),因?yàn)榻裉炷硞€(gè)用戶(hù)對(duì)某篇文章的記錄可能在歷史數(shù)據(jù)中也存在,而 unionAll() 方法并沒(méi)有去重,這里我們可以按照用戶(hù) ID 和文章 ID 進(jìn)行分組,利用 max() 方法得到 action_time, channel_id, shared, clicked, collected, exposure, read_time 即可,去重后直接存儲(chǔ)到 user_article_basic 表中

new_data.registerTempTable("temptable")

self.spark.sql('''insert overwrite table user_article_basic select user_id, max(action_time) as action_time, 
        article_id, max(channel_id) as channel_id, max(shared) as shared, max(clicked) as clicked, 
        max(collected) as collected, max(exposure) as exposure, max(read_time) as read_time from temptable 
        group by user_id, article_id''')

表 user_article_basic 結(jié)果如下所示

計(jì)算用戶(hù)畫(huà)像

我們選擇將用戶(hù)畫(huà)像存儲(chǔ)在 Hbase 中,因?yàn)?Hbase 支持原子性操作和快速讀取,并且 Hive 也可以通過(guò)創(chuàng)建外部表關(guān)聯(lián)到 Hbase,進(jìn)行離線分析,如果要?jiǎng)h除 Hive 外部表的話,對(duì) Hbase 也沒(méi)有影響。首先,在 Hbase 中創(chuàng)建用戶(hù)畫(huà)像表

create 'user_profile', 'basic','partial','env'

在 Hive 中創(chuàng)建 Hbase 外部表,注意字段類(lèi)型設(shè)置為 map

create external table user_profile_hbase
(
    user_id         STRING comment "userID",
    information     MAP<STRING, DOUBLE> comment "user basic information",
    article_partial MAP<STRING, DOUBLE> comment "article partial",
    env             MAP<STRING, INT> comment "user env"
)
    COMMENT "user profile table"
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
        WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,basic:,partial:,env:")
    TBLPROPERTIES ("hbase.table.name" = "user_profile");

創(chuàng)建外部表之后,還需要導(dǎo)入一些依賴(lài)包

cp -r /root/bigdata/hbase/lib/hbase-*.jar /root/bigdata/spark/jars/
cp -r /root/bigdata/hive/lib/h*.jar /root/bigdata/spark/jars/

接下來(lái),讀取處理好的用戶(hù)行為數(shù)據(jù),由于日志中的 channel_id 有可能是來(lái)自于推薦頻道(0),而不是文章真實(shí)的頻道,所以這里要將 channel_id 列刪除

spark.sql("use profile")
user_article_basic = spark.sql("select * from user_article_basic").drop('channel_id')

通過(guò)文章 ID,將用戶(hù)行為數(shù)據(jù)與文章畫(huà)像數(shù)據(jù)進(jìn)行連接,從而得到文章頻道 ID 和文章主題詞

spark.sql('use article')
article_topic = spark.sql("select article_id, channel_id, topics from article_profile")
user_article_topic = user_article_basic.join(article_topic, how='left', on=['article_id'])

user_article_topic 結(jié)果如下圖所示,其中 topics 列即為文章主題詞列表,如 ['補(bǔ)碼', '字符串', '李白', ...]

接下來(lái),我們需要計(jì)算每一個(gè)主題詞對(duì)于用戶(hù)的權(quán)重,所以需要將 topics 列中的每個(gè)主題詞都拆分為單獨(dú)的一條記錄??梢岳?Spark 的 explode() 方法,達(dá)到類(lèi)似“爆炸”的效果

import pyspark.sql.functions as F

user_article_topic = user_topic.withColumn('topic', F.explode('topics')).drop('topics')

user_article_topic 如下圖所示

我們通過(guò)用戶(hù)對(duì)哪些文章發(fā)生了行為以及該文章有哪些主題詞,計(jì)算出了用戶(hù)對(duì)哪些主題詞發(fā)生了行為。這樣,我們就可以根據(jù)用戶(hù)對(duì)主題詞的行為來(lái)計(jì)算主題詞對(duì)用戶(hù)的權(quán)重,并且將這些主題詞作為用戶(hù)的標(biāo)簽。那么,用戶(hù)標(biāo)簽權(quán)重的計(jì)算公式為:用戶(hù)標(biāo)簽權(quán)重 =(用戶(hù)行為分值之和)x 時(shí)間衰減。其中,時(shí)間衰減公式為:時(shí)間衰減系數(shù) = 1 / (log(t) + 1),其中 t 為發(fā)生行為的時(shí)間距離當(dāng)前時(shí)間的大小

不同的用戶(hù)行為對(duì)應(yīng)不同的權(quán)重,如下所示

用戶(hù)行為 分值
閱讀時(shí)間(<1000) 1
閱讀時(shí)間(>=1000) 2
收藏 2
分享 3
點(diǎn)擊 5

計(jì)算用戶(hù)標(biāo)簽及權(quán)重,并存儲(chǔ)到 Hbase 中 user_profile 表的 partial 列族中。注意,這里我們將頻道 ID 和標(biāo)簽一起作為 partial 列族的鍵存儲(chǔ),這樣我們就方便查詢(xún)不同頻道的標(biāo)簽及權(quán)重了

def compute_user_label_weights(partitions):
    """ 計(jì)算用戶(hù)標(biāo)簽權(quán)重
    """
    action_weight = {
        "read_min": 1,
        "read_middle": 2,
        "collect": 2,
        "share": 3,
        "click": 5
    }

    from datetime import datetime
    import numpy as np
    
    # 循環(huán)處理每個(gè)用戶(hù)對(duì)應(yīng)的每個(gè)主題詞
    for row in partitions:
        # 計(jì)算時(shí)間衰減系數(shù)
        t = datetime.now() - datetime.strptime(row.action_time, '%Y-%m-%d %H:%M:%S')
        alpha = 1 / (np.log(t.days + 1) + 1)
        
        if row.read_time  == '':
            read_t = 0
        else:
            read_t = int(row.read_time)
        
        # 計(jì)算閱讀時(shí)間的行為分?jǐn)?shù)
        read_score = action_weight['read_middle'] if read_t > 1000 else action_weight['read_min']
        
        # 計(jì)算各種行為的權(quán)重和并乘以時(shí)間衰減系數(shù)
        weights = alpha * (row.shared * action_weight['share'] + row.clicked * action_weight['click'] +
                          row.collected * action_weight['collect'] + read_score)
        
        # 更新到user_profilehbase表
        with pool.connection() as conn:
            table = conn.table('user_profile')
            table.put('user:{}'.format(row.user_id).encode(),
                      {'partial:{}:{}'.format(row.channel_id, row.topic).encode(): json.dumps(
                          weights).encode()})
            conn.close()

user_topic.foreachPartition(compute_user_label_weights)

在 Hive 中查詢(xún)用戶(hù)標(biāo)簽及權(quán)重

hive> select * from user_profile_hbase limit 1;
OK
user:1  {"birthday":0.0,"gender":null}  {"18:##":0.25704484358604845,"18:&#":0.25704484358604845,"18:+++":0.23934588700996243,"18:+++++":0.23934588700996243,"18:AAA":0.2747964402379244,"18:Animal":0.2747964402379244,"18:Author":0.2747964402379244,"18:BASE":0.23934588700996243,"18:BBQ":0.23934588700996243,"18:Blueprint":1.6487786414275463,"18:Code":0.23934588700996243,"18:DIR......

接下來(lái),要將用戶(hù)屬性信息加入到用戶(hù)畫(huà)像中。讀取用戶(hù)基礎(chǔ)信息,存儲(chǔ)到用戶(hù)畫(huà)像表的 basic 列族即可

def update_user_info():
    """
    更新用戶(hù)畫(huà)像的屬性信息
    :return:
    """
    spark.sql("use toutiao")
    user_basic = spark.sql("select user_id, gender, birthday from user_profile")

    def udapte_user_basic(partition):

        import happybase
        #  用于讀取hbase緩存結(jié)果配置
        pool = happybase.ConnectionPool(size=10, host='172.17.0.134', port=9090)
        for row in partition:
            from datetime import date
            age = 0
            if row.birthday != 'null':
                born = datetime.strptime(row.birthday, '%Y-%m-%d')
                today = date.today()
                age = today.year - born.year - ((today.month, today.day) < (born.month, born.day))

            with pool.connection() as conn:
                table = conn.table('user_profile')
                table.put('user:{}'.format(row.user_id).encode(),
                          {'basic:gender'.encode(): json.dumps(row.gender).encode()})
                table.put('user:{}'.format(row.user_id).encode(),
                          {'basic:birthday'.encode(): json.dumps(age).encode()})
                conn.close()

    user_basic.foreachPartition(udapte_user_basic)

到這里,我們的用戶(hù)畫(huà)像就計(jì)算完成了。

Apscheduler 定時(shí)更新

定義更新用戶(hù)畫(huà)像方法,首先處理用戶(hù)行為日志,拆分文章主題詞,接著計(jì)算用戶(hù)標(biāo)簽的權(quán)重,最后再將用戶(hù)屬性信息加入到用戶(hù)畫(huà)像中

def update_user_profile():
    """
    定時(shí)更新用戶(hù)畫(huà)像的邏輯
    :return:
    """
    up = UpdateUserProfile()
    if up.update_user_action_basic():
        up.update_user_label()
        up.update_user_info()

在 Apscheduler 中添加定時(shí)更新用戶(hù)畫(huà)像任務(wù),設(shè)定每隔 2 個(gè)小時(shí)更新一次

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor

# 創(chuàng)建scheduler,多進(jìn)程執(zhí)行
executors = {
    'default': ProcessPoolExecutor(3)
}

scheduler = BlockingScheduler(executors=executors)

# 添加一個(gè)定時(shí)運(yùn)行文章畫(huà)像更新的任務(wù), 每隔1個(gè)小時(shí)運(yùn)行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
# 添加一個(gè)定時(shí)運(yùn)行用戶(hù)畫(huà)像更新的任務(wù), 每隔2個(gè)小時(shí)運(yùn)行一次
scheduler.add_job(update_user_profile, trigger='interval', hours=2)

scheduler.start()

另外說(shuō)一下,在實(shí)際場(chǎng)景中,用戶(hù)畫(huà)像往往是非常復(fù)雜的,下面是電商場(chǎng)景的用戶(hù)畫(huà)像,可以了解一下。

參考

https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(學(xué)習(xí)資源已保存至網(wǎng)盤(pán), 提取碼:eakp)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容