1 mq的作用
解耦、異步、削峰填谷
2 kafka架構(gòu)
1)Producer :消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端;
2)Consumer :消息消費(fèi)者,向 kafka broker 取消息的客戶端;
3)Consumer Group (CG):消費(fèi)者組,由多個(gè) consumer 組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)
責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。
4)Broker :一臺(tái) kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè) broker
可以容納多個(gè) topic。
5)Topic :可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topic;
6)Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上,
一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列;
7)Replica:副本,為保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的 partition 數(shù)據(jù)不丟失,且 kafka 仍然能夠繼續(xù)工作,kafka 提供了副本機(jī)制,一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè) leader 和若干個(gè) follower。
8)leader:每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是 leader。
9)follower:每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從 leader 中同步數(shù)據(jù),保持和 leader 數(shù)據(jù)
的同步。leader 發(fā)生故障時(shí),某個(gè) follower 會(huì)成為新的 follower。
3 kafka存儲(chǔ)機(jī)制
1)topic 是邏輯上的概念,而 partition 是物理上的概念,每個(gè) partition 對(duì)應(yīng)于一個(gè) log 文件,該 log 文件中存儲(chǔ)的就是 producer 生產(chǎn)的數(shù)據(jù)。Producer 生產(chǎn)的數(shù)據(jù)會(huì)被不斷追加到該log 文件末端,且每條數(shù)據(jù)都有自己的 offset。消費(fèi)者組中的每個(gè)消費(fèi)者,都會(huì)實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) offset,以便出錯(cuò)恢復(fù)時(shí),從上次的位置繼續(xù)消費(fèi)。
2)由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷追加到 log 文件末尾,為防止 log 文件過(guò)大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka 采取了分片和索引機(jī)制,將每個(gè) partition 分為多個(gè) segment。每個(gè) segment對(duì)應(yīng)兩個(gè)文件——“.index”文件和“.log”文件。這些文件位于一個(gè)文件夾下,該文件夾的命名規(guī)則為:topic 名稱+分區(qū)序號(hào)。例如,first 這個(gè) topic 有三個(gè)分區(qū),則其對(duì)應(yīng)的文件夾為 first-0,first-1,first-2。
3)index 和 log 文件以當(dāng)前 segment 的第一條消息的 offset 命名?!?index”文件存儲(chǔ)大量的索引信息,“.log”文件存儲(chǔ)大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中 message 的物理偏移地址。
4 producer分區(qū)原則
producer 發(fā)送的數(shù)據(jù)封裝成一個(gè) ProducerRecord 對(duì)象。
1)指明 partition 的情況下,直接將指明的值直接作為 partiton 值;
2)沒(méi)有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition數(shù)進(jìn)行取余得到 partition 值;
3)既沒(méi)有 partition 值又沒(méi)有 key 值的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增),將這個(gè)值與 topic 可用的 partition 總數(shù)取余得到 partition值,也就是常說(shuō)的 round-robin 算法。
5 生產(chǎn)者丟數(shù)據(jù)?
1)副本同步策略
- 半數(shù)以上完成同步,就發(fā)送 ack 延遲低 選舉新的 leader 時(shí),容忍 n 臺(tái)節(jié)點(diǎn)的故障,需要 2n+1 個(gè)副
- 本全部完成同步,才發(fā)送ack 選舉新的 leader 時(shí),容忍 n 臺(tái) 節(jié)點(diǎn)的故障,需要 n+1 個(gè)副本 延遲高
val properties = new Properties
properties.put("bootstrap.servers", broker_list)
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.put("enable.idempotence",(true: java.lang.Boolean)) //冪等性、開啟事務(wù)
properties.put(ProducerConfig.ACKS_CONFIG, "-1")
var producer: KafkaProducer[String, String] = null
try
producer = new KafkaProducer[String, String](properties)
catch {
case e: Exception => e.printStackTrace()
}
producer
- 極端情況設(shè)置ProducerConfig.RETRY_CONFIG
2)ISR - Leader 維護(hù)了一個(gè)動(dòng)態(tài)的 in-sync replica set (ISR),意為和 leader 保持同步的 follower 集合。當(dāng) ISR 中的 follower 完成數(shù)據(jù)的同步之后,leader 就會(huì)給 follower 發(fā)送 ack。如果 follower長(zhǎng)時(shí)間 未 向 leader 同 步 數(shù) 據(jù) , 則 該 follower 將 被 踢 出 ISR , 該 時(shí) 間 閾 值 由replica.lag.time.max.ms 參數(shù)設(shè)定。Leader 發(fā)生故障之后,就會(huì)從 ISR 中選舉新的 leader。
6 kafka丟數(shù)據(jù)?
1)給 topic 設(shè)置 replication.factor 參數(shù):這個(gè)值必須大于 1,要求每個(gè) partition 必須有至少 2 個(gè)副本。
2)在 Kafka 服務(wù)端設(shè)置 min.insync.replicas 參數(shù):這個(gè)值必須大于 1,這個(gè)是要求一個(gè) leader 至少感知到有至少一個(gè) follower 還跟自己保持聯(lián)系,沒(méi)掉隊(duì),這樣才能確保 leader 掛了還有一個(gè) follower 吧。
7 消費(fèi)者丟數(shù)據(jù)?
- 關(guān)閉自動(dòng)提交 offset,在處理完之后自己手動(dòng)提交 offset,就可以保證數(shù)據(jù)不會(huì)丟
// kafka消費(fèi)者配置
var kafkaParam = collection.mutable.Map(
"bootstrap.servers" -> broker_list, //用于初始化鏈接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于標(biāo)識(shí)這個(gè)消費(fèi)者屬于哪個(gè)消費(fèi)團(tuán)體
"group.id" -> "gmall_group",
//latest自動(dòng)重置偏移量為最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,則這個(gè)消費(fèi)者的偏移量會(huì)在后臺(tái)自動(dòng)提交,但是kafka宕機(jī)容易丟失數(shù)據(jù)
//如果是false,會(huì)需要手動(dòng)維護(hù)kafka偏移量
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 存儲(chǔ)每個(gè)分區(qū)的offset
def saveOffset(topic: String, groupId: String, offsetRanges: Array[OffsetRange]): Unit = {
//拼接redis中操作偏移量的key
var offsetKey = "offset:" + topic + ":" + groupId
//定義java的map集合,用于存放每個(gè)分區(qū)對(duì)應(yīng)的偏移量
val offsetMap: util.HashMap[String, String] = new util.HashMap[String, String]()
//對(duì)offsetRanges進(jìn)行遍歷,將數(shù)據(jù)封裝offsetMap
for (offsetRange <- offsetRanges) {
val partitionId: Int = offsetRange.partition
val fromOffset: Long = offsetRange.fromOffset
val untilOffset: Long = offsetRange.untilOffset
offsetMap.put(partitionId.toString, untilOffset.toString)
println("保存分區(qū)" + partitionId + ":" + fromOffset + "----->" + untilOffset)
}
val jedis: Jedis = MyRedisUtil.getJedisClient()
jedis.hmset(offsetKey, offsetMap)
jedis.close()
}
8 重復(fù)消費(fèi)
- Kafka 實(shí)際上有個(gè) offset 的概念,就是每個(gè)消息寫進(jìn)去,都有一個(gè) offset,代表消息的序號(hào),然后 consumer 消費(fèi)了數(shù)據(jù)之后,每隔一段時(shí)間(定時(shí)定期),會(huì)把自己消費(fèi)過(guò)的消息的 offset 提交一下,表示“我已經(jīng)消費(fèi)過(guò)了,下次我要是重啟啥的,你就讓我繼續(xù)從上次消費(fèi)到的 offset 來(lái)繼續(xù)消費(fèi)吧”。
- 新版的 Kafka 已經(jīng)將 offset 的存儲(chǔ)從 Zookeeper 轉(zhuǎn)移至 Kafka brokers,并使用內(nèi)部位移主題 __consumer_offsets 進(jìn)行存儲(chǔ)。
- producer事務(wù)。為了實(shí)現(xiàn)跨分區(qū)跨會(huì)話的事務(wù),需要引入一個(gè)全局唯一的 Transaction ID,并將 Producer獲得的PID 和Transaction ID 綁定。這樣當(dāng)Producer 重啟后就可以通過(guò)正在進(jìn)行的 Transaction ID 獲得原來(lái)的 PID。
// 開啟producer事務(wù)
properties.put("enable.idempotence",(true: java.lang.Boolean)) //冪等性、開啟事務(wù)