文章推薦系統(tǒng) | 十、基于熱門文章和新文章的在線召回

推薦閱讀:
文章推薦系統(tǒng) | 一、推薦流程設計
文章推薦系統(tǒng) | 二、同步業(yè)務數據
文章推薦系統(tǒng) | 三、收集用戶行為數據
文章推薦系統(tǒng) | 四、構建離線文章畫像
文章推薦系統(tǒng) | 五、計算文章相似度
文章推薦系統(tǒng) | 六、構建離線用戶畫像
文章推薦系統(tǒng) | 七、構建離線文章特征和用戶特征
文章推薦系統(tǒng) | 八、基于模型的離線召回
文章推薦系統(tǒng) | 九、基于內容的離線及在線召回

在上篇文章中我們實現了基于內容的在線召回,接下來,我們將實現基于熱門文章和新文章的在線召回。主要思路是根據點擊次數,統(tǒng)計每個頻道下的熱門文章,根據發(fā)布時間統(tǒng)計每個頻道下的新文章,當推薦文章不足時,可以根據這些文章進行補足。

由于數據量較小,這里采用 Redis 存儲熱門文章和新文章的召回結果,數據結構如下所示

熱門文章召回 結構 示例
popular_recall ch:{}:hot ch:18:hot
新文章召回 結構 示例
new_article ch:{}:new ch:18:new

熱門文章存儲,鍵為 ch:頻道ID:hot 值為 分數文章ID

# ZINCRBY key increment member
# ZSCORE
# 為有序集 key 的成員 member 的 score 值加上增量 increment 。
client.zincrby("ch:{}:hot".format(row['channelId']), 1, row['param']['articleId'])

# ZREVRANGE key start stop [WITHSCORES]
client.zrevrange(ch:{}:new, 0, -1)

新文章存儲,鍵為 ch:{頻道ID}:new 值為 文章ID:時間戳

# ZADD ZRANGE
# ZADD key score member [[score member] [score member] ...]
# ZRANGE page_rank 0 -1
client.zadd("ch:{}:new".format(channel_id), {article_id: time.time()})

熱門文章在線召回

首先,添加 Spark Streaming 和 Kafka 的配置,熱門文章讀取由業(yè)務系統(tǒng)發(fā)送到 Kafka 的 click-trace 主題中的用戶實時行為數據

KAFKA_SERVER = "192.168.19.137:9092"
click_kafkaParams = {"metadata.broker.list": KAFKA_SERVER}
HOT_DS = KafkaUtils.createDirectStream(stream_c, ['click-trace'], click_kafkaParams)

接下來,利用 Spark Streaming 讀取 Kafka 中的用戶行為數據,篩選出被點擊過的文章,將 Redis 中的文章熱度分數進行累加即可

client = redis.StrictRedis(host=DefaultConfig.REDIS_HOST, port=DefaultConfig.REDIS_PORT, db=10)

def update_hot_redis(self):
    """
    收集用戶行為,更新熱門文章分數
    :return:
    """
    def update_hot_article(rdd):
        for data in rdd.collect():
            # 過濾用戶行為
            if data['param']['action'] in ['exposure', 'read']:
                pass
            else:
                client.zincrby("ch:{}:hot".format(data['channelId']), 1, data['param']['articleId'])

    HOT_DS.map(lambda x: json.loads(x[1])).foreachRDD(update_hot_article)

測試,寫入用戶行為日志

echo {\"actionTime\":\"2019-04-10 21:04:39\",\"readTime\":\"\",\"channelId\":18,\"param\":{\"action\": \"click\", \"userId\": \"2\", \"articleId\": \"14299\", \"algorithmCombine\": \"C2\"}} >> userClick.log

查詢熱門文章

127.0.0.1:6379[10]> keys *
1) "ch:18:hot"
127.0.0.1:6379[10]> ZRANGE "ch:18:hot" 0 -1
1) "14299"

新文章在線召回

首先,添加 Spark Streaming 和 Kafka 的配置,新文章讀取由業(yè)務系統(tǒng)發(fā)送到 Kafka 的 new-article 主題中的最新發(fā)布文章數據

NEW_ARTICLE_DS = KafkaUtils.createDirectStream(stream_c, ['new-article'], click_kafkaParams)

接下來,利用 Spark Streaming 讀取 Kafka 的新文章,將其按頻道添加到 Redis 中,Redis 的值為當前時間

def  update_new_redis(self):
    """更新頻道最新文章
    :return:
    """
    def add_new_article(rdd):
        for row in rdd.collect():
            channel_id, article_id = row.split(',')
            client.zadd("ch:{}:new".format(channel_id), {article_id: time.time()})

    NEW_ARTICLE_DS.map(lambda x: x[1]).foreachRDD(add_new_article)

還需要在 Kafka 的啟動腳本中添加 new-article 主題監(jiān)聽配置,這樣就可以收到業(yè)務系統(tǒng)發(fā)送過來的新文章了,重新啟動 Flume 和 Kafka

/root/bigdata/kafka/bin/kafka-topics.sh --zookeeper 192.168.19.137:2181 --create --replication-factor 1 --topic new-article --partitions 1

測試,向 Kafka 發(fā)送新文章數據

from kafka import KafkaProducer 

# kafka消息生產者
kafka_producer = KafkaProducer(bootstrap_servers=['192.168.19.137:9092'])

# 構造消息并發(fā)送
msg = '{},{}'.format(18, 13891)
kafka_producer.send('new-article', msg.encode())

查看新文章

127.0.0.1:6379[10]> keys *
1) "ch:18:hot"
2) "ch:18:new"
127.0.0.1:6379[10]> ZRANGE "ch:18:new" 0 -1
1) "13890"
2) "13891"

最后,修改 online_update.py,加入基于熱門文章和新文章的在線召回邏輯,開啟實時運行即可

if __name__ == '__main__':
    ore = OnlineRecall()
    ore.update_content_recall()
    ore.update_hot_redis()
    ore.update_new_redis()
    stream_sc.start()
    # 使用 ctrl+c 可以退出服務
    _ONE_DAY_IN_SECONDS = 60 * 60 * 24
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        pass

到這里,我們就完成了召回階段的全部工作,包括基于模型和基于內容的離線召回,以及基于內容、熱門文章和新文章的在線召回。通過召回,我們可以從數百萬甚至上億的原始物品數據中,篩選出和用戶相關的幾百、幾千個可能感興趣的物品,后面,我們將要進入到排序階段,對召回的幾百、幾千個物品進行進一步的篩選和排序。

參考

https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(學習資源已保存至網盤, 提取碼:eakp)

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容