==[案例]Spark實時統(tǒng)計訂單量

Spark實時統(tǒng)計訂單量 - 簡書
http://www.itdecent.cn/p/3ec093a9d584

Paste_Image.png

注:組件不了解的同學可參考其他文章,本文主要講項目的實現(xiàn)
1、某些同學會問,直接在業(yè)務系統(tǒng)加入JS埋點通過發(fā)日志不更好嗎?
答:第一、JS埋點業(yè)務系統(tǒng)涉及產(chǎn)品改造,不可能因為一個需求讓你去隨便改業(yè)務系統(tǒng)。第二、即使加入JS埋點也不可能獲得業(yè)務系統(tǒng)的全部數(shù)據(jù)。所以業(yè)務系統(tǒng)核心數(shù)據(jù)還得去業(yè)務系統(tǒng)庫獲取。

2、還有人問加入Kafka太多余
答:第一、加入Kafka為了使系統(tǒng)擴展性更強,可方便對接各種開源產(chǎn)品。第二、通過Kafka消息組可使同一條消息被不同Consumer消費,用戶離線和實時兩條線。


前言
本人GitHub地址:https://github.com/guofei1219QQ : 86608625咨詢項目相關問題的請直接說明問題,不要一直問在嗎?還在嗎?等問題,博主QQ一直健在呢,由于本人平時還要工作,問題不能及時回復請見諒?。。?br> 背景
用戶下單數(shù)據(jù)會通過業(yè)務系統(tǒng)實時產(chǎn)生入庫到mysql庫,我們要統(tǒng)計通某個推廣渠道實時下單量,以便線上運營推廣人員查看不同渠道推廣效果進而執(zhí)行不同推廣策略
系統(tǒng)架構(gòu)

架構(gòu)圖

注:組件不了解的同學可參考其他文章,本文主要講項目的實現(xiàn)1、某些同學會問,直接在業(yè)務系統(tǒng)加入JS埋點通過發(fā)日志不更好嗎?答:第一、JS埋點業(yè)務系統(tǒng)涉及產(chǎn)品改造,不可能因為一個需求讓你去隨便改業(yè)務系統(tǒng)。第二、即使加入JS埋點也不可能獲得業(yè)務系統(tǒng)的全部數(shù)據(jù)。所以業(yè)務系統(tǒng)核心數(shù)據(jù)還得去業(yè)務系統(tǒng)庫獲取。
2、還有人問加入Kafka太多余答:第一、加入Kafka為了使系統(tǒng)擴展性更強,可方便對接各種開源產(chǎn)品。第二、通過Kafka消息組可使同一條消息被不同Consumer消費,用戶離線和實時兩條線。
解析Mysql binlog日志
主要邏輯
1.創(chuàng)建Canal連接2.解析Mysql binlog獲得insert語句
public static void main(String args[]) { //第一個參數(shù)為Canal server服務IP地址如果使用windows開發(fā)連接linux Canal服務需要制定IP eg: new InetSocketAddress("192.168.61.132", 11111) //第二個參數(shù)為Canal server服務端口號 Canal server IP和端口號在 /conf/canal.properties中配置 //第三個參數(shù)為Canal instance名稱 /conf下目錄名稱 //第四第五個參數(shù)為mysql用戶名和密碼,如果在 /conf/example/instance.properties中已經(jīng)配置 這里不用謝 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.61.132", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".\.."); connector.rollback(); int totalEmtryCount = 120; while (emptyCount < totalEmtryCount) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù) long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù) } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); }}

組裝數(shù)據(jù)發(fā)送至Kafka
private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue()); KafkaProducer.sendMsg("canal", UUID.randomUUID().toString() ,column.getName() + " : " + column.getValue()); }}

Streaming分渠道匯總數(shù)據(jù)
以DStream中的數(shù)據(jù)進行按key做reduce操作,然后對各個批次的數(shù)據(jù)進行累加在有新的數(shù)據(jù)信息進入或更新時,可以讓用戶保持想要的任何狀。使用這個功能需要完成兩步:1) 定義狀態(tài):可以是任意數(shù)據(jù)類型2) 定義狀態(tài)更新函數(shù):用一個函數(shù)指定如何使用先前的狀態(tài),從輸入流中的新值更新狀態(tài)。對于有狀態(tài)操作,要不斷的把當前和歷史的時間切片的RDD累加計算,隨著時間的流失,計算的數(shù)據(jù)規(guī)模會變得越來越大。
val orders = resut_lines.updateStateByKey(updateRunningSum _)def updateRunningSum(values: Seq[Long], state: Option[Long]) = {/* state:存放的歷史數(shù)據(jù) values:當前批次匯總值 */Some(state.getOrElse(0L)+values.sum)}

統(tǒng)計結(jié)果寫入Mysql
實時匯總某渠道下單量需要根據(jù)渠道為主鍵更新或插入新數(shù)據(jù)1.當某個渠道第一單時,庫中沒有以此渠道為主鍵的數(shù)據(jù),需要insert into 訂單統(tǒng)計表2.當某渠道在庫中已有該渠道下單量,需要更新此渠道下單量值 update 訂單統(tǒng)計表所以我們使用:

有該渠道就更新,沒有就插入REPLACE INTO order_statistic(chanel, orders) VALUES(?, ?)

orders.foreachRDD(rdd =>{ rdd.foreachPartition(rdd_partition =>{ rdd_partition.foreach(data=>{ if(!data.toString.isEmpty) { System.out.println("訂單量"+" : "+data._2) DataUtil.toMySQL(data._1.toString,data._2.toInt) } }) })})def toMySQL(name: String,orders:Int) = { var conn: Connection = null var ps: PreparedStatement = null val sql = "REPLACE INTO order_statistic(chanel, orders) VALUES(?, ?)" try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://192.168.20.126:3306/test", "root", "root") ps = conn.prepareStatement(sql) ps.setString(1, name) ps.setInt(2, orders) ps.executeUpdate() } catch { case e: Exception => e.printStackTrace() } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close() } }}

FAQ
1.canal依賴Canal protobuf版本為2.4.1,而spark依賴的2.5版本
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.4.1</version></dependency>

參考文章
1.Canal wiki:https://github.com/alibaba/canal/wiki2.streaming關于轉(zhuǎn)化操作http://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#transformations-on-dstreams3.mysql的replace intohttp://blog.sina.com.cn/s/blog_5f53615f01016wy3.html
文/MichaelFly(簡書作者)原文鏈接:http://www.itdecent.cn/p/3ec093a9d584著作權(quán)歸作者所有,轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),并標注“簡書作者”。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容