Kafka技術內幕

第一部分:初步了解Kafka

Apache Kafka是一個分布式流媒體平臺。這到底是什么意思?
流媒體平臺具有三個關鍵功能:
發(fā)布和訂閱記錄流,類似于消息隊列或企業(yè)消息系統(tǒng)。
以容錯的持久方式存儲記錄流。
記錄發(fā)生時的處理流。

卡夫卡通常用于兩類廣泛的應用:

1.建立實時流數據管道,在系統(tǒng)或應用之間可靠地獲取數據

2.建立對數據流進行轉換或反應的實時流應用程序

為了理解卡夫卡是如何做到這些的,讓我們從下到上探究卡夫卡的能力。

首先是幾個概念:

卡夫卡作為一個集群運行在一個或多個可以跨多個數據中心的服務器上。

卡夫卡集群將記錄流存儲在稱為主題的類別中。
每個記錄由一個鍵、一個值和一個時間戳組成。

Kafka 有四個核心API:

生產者API(Producer API)允許應用程序將記錄流發(fā)布到一個或多個卡夫卡主題。

消費者API(Consumer API)允許應用程序訂閱一個或多個主題,并處理產生給它們的記錄流。

流API(Streams API)允許應用充當流處理器,消耗來自一個或多個主題的輸入流,并將輸出流生成到一個或多個輸出主題,有效地將輸入流轉換為輸出流。

連接器API(Connector API)允許建立和運行可重用的生產者或消費者,將卡夫卡主題連接到現(xiàn)有的應用程序或數據系統(tǒng)。例如,關系數據庫的連接器可能會捕獲表的每一個更改。



在卡夫卡中,客戶端和服務器之間的通信是用簡單、高性能、語言不可知的TCP協(xié)議來完成的。該協(xié)議版本化,并保持與舊版本的向后兼容性。我們?yōu)榭ǚ蚩ㄌ峁㎎ava客戶端,但客戶端可以用多種語言提供。

Topics and Logs

讓我們先深入研究核心抽象卡夫卡為一條記錄流提供的主題。

主題是記錄發(fā)布的類別或進給名稱??ǚ蚩ㄖ械闹黝}總是多用戶的,也就是說,主題可以有一個或多個訂閱該數據的用戶。

對于每個主題,卡夫卡集群維護一個看起來像這樣的分區(qū)日志:


每一個分區(qū)都是一個有序的、不可變的記錄序列,它連續(xù)地附加到結構化提交日志。每個分區(qū)中的記錄都被分配一個連續(xù)的ID號,稱為唯一地標識分區(qū)內的每個記錄的偏移量。

無論是否已經使用可配置的保留周期消耗,所有的已發(fā)布記錄都能持久地保存卡夫卡集群。例如,如果保留策略設置為兩天,那么在發(fā)布記錄后的兩天,它可以用于消費,之后將被丟棄以釋放空間。卡夫卡的性能相對于數據大小是有效不變的,所以長時間存儲數據不是問題。


事實上,在每個消費者基礎上保留的唯一元數據是該用戶在日志中的偏移或位置。這種偏移是由消費者控制的:正常情況下,消費者會在讀取記錄時線性地偏移其偏移量,但事實上,由于該位置是由消費者控制的,所以它可以按它喜歡的任何順序消耗記錄。例如,消費者可以重置為舊的偏移量,以重新處理過去的數據,或者跳過最近的記錄并開始從“現(xiàn)在”開始消費。

這種組合的特征意味著卡夫卡的消費者非常便宜,他們可以來來往往,對集群或其他消費者沒有太大的影響。例如,您可以使用我們的命令行工具來“尾隨”任何主題的內容,而不改變任何現(xiàn)有消費者所消耗的內容。

日志中的分區(qū)有多種用途。首先,它們允許日志超出一個適合于單個服務器的大小。每個單獨的分區(qū)必須適合于承載它的服務器,但是一個主題可能有許多分區(qū),因此它可以處理任意數量的數據。其次,它們作為并行的單位更多的是在這一點上。

Distribution

日志分區(qū)被分布在卡夫卡集群中的服務器上,每個服務器處理數據并請求共享分區(qū)。每個分區(qū)在可配置的多個服務器上復制以容錯。

每個分區(qū)都有一個服務器充當“領導者”和零個或多個服務器,充當“追隨者”。領導者處理所有的讀寫請求的分區(qū),而追隨者被動復制的領導者。如果領導者失敗,其中的一個追隨者將自動成為新的領導者。每個服務器充當一些分區(qū)的領導者和其他人的追隨者,因此集群內的負載很好地平衡。

Geo-Replication

Kafka MirrorMaker為您的集群提供地理復制支持。使用鏡像機,消息在多個數據中心或云區(qū)域上被復制。可以在主動/被動場景中使用此備份和恢復;或者在活動/活動場景中,將數據更靠近用戶,或支持數據位置要求。

Producers

生產者將數據發(fā)布到他們選擇的主題中。生產者負責選擇要分配給主題內的哪個分區(qū)的記錄。這可以在循環(huán)的方式下完成,只是為了平衡負載,或者它可以根據一些語義劃分函數來完成(例如基于記錄中的某些鍵)。

Consumers

消費者用消費者組名稱來標記自己,并且發(fā)布到主題的每個記錄被傳遞到每個訂閱消費者組中的一個消費者實例。消費者實例可以在單獨的進程中或在單獨的機器上。

如果所有的消費者實例都具有相同的消費群,那么記錄將有效地在消費者實例上進行負載均衡。

如果所有的消費者實例都有不同的消費群體,那么每個記錄將被廣播到所有的消費過程。

兩個服務器卡夫卡集群托管四個分區(qū)(P0至P3)與兩個消費群體。消費者組A有兩個消費者實例,B組有四個。

然而,更常見的是,我們發(fā)現(xiàn)話題有少量的消費群體,每個都有一個“邏輯用戶”。每個組由許多可擴展性和容錯性的消費者實例組成。這僅僅是發(fā)布訂閱語義,其中訂閱服務器是一組消費者而不是單個進程。

在卡夫卡中實現(xiàn)消費的方法是通過將日志中的分區(qū)除以消費者實例,使得每個實例在任何時間點都是分區(qū)的“公平共享”的唯一消費者。這個組中的成員保持過程是由卡夫卡協(xié)議動態(tài)處理的。如果新實例加入組,它們將從組的其他成員接管一些分區(qū);如果一個實例死亡,它的分區(qū)將被分發(fā)到其余實例。

卡夫卡只提供一個分區(qū)內的記錄的總順序,而不是在一個主題中的不同分區(qū)之間。每一個分區(qū)排序結合能力分區(qū)數據的關鍵是足夠的大多數應用程序。但是,如果需要對記錄進行總排序,則可以用只有一個分區(qū)的主題來實現(xiàn),但這將意味著每個消費者組只有一個消費者進程。

多租戶技術

可以將卡夫卡部署為多租戶解決方案。多租戶是通過配置哪些主題可以產生或消耗數據來實現(xiàn)的。也有對配額的操作支持。管理員可以在請求上定義和執(zhí)行配額,以控制客戶端使用的代理資源。

保證

在高級別卡夫卡提供以下保證:

生產者發(fā)送給特定主題分區(qū)的消息將按照發(fā)送的順序添加。也就是說,如果記錄M1是由與記錄M2相同的生產者發(fā)送的,M1是最先發(fā)送的,那么M1將具有比M2更低的偏移量,并且在日志中出現(xiàn)得更早。

一個用戶實例以記錄存儲在日志中的順序查看記錄。

對于具有復制因子N的主題,我們將容忍多達N-1服務器故障而不丟失提交到日志的任何記錄。

Kafka可以保證同一個分區(qū)里的消息是有序的,也就是說生產者按照一定的順序發(fā)送消息,broker就會按照這個順序把他們寫入分區(qū),消費者也會按照同樣的順序讀取他們

卡夫卡作為消息傳遞系統(tǒng)

卡夫卡的流概念如何與傳統(tǒng)的企業(yè)消息系統(tǒng)相比較?

消息傳遞傳統(tǒng)上有兩種模式:排隊和發(fā)布訂閱。在隊列中,消費者池可以從服務器讀取,并且每個記錄都轉到其中一個;在發(fā)布訂閱中,記錄被廣播給所有消費者。這兩種模式各有其優(yōu)點和缺點。排隊的優(yōu)點在于,它允許您在多個消費者實例上劃分數據的處理,這使得您可以對處理進行縮放。不幸的是,一旦一個進程讀取了它的數據,隊列就不再是多用戶。發(fā)布訂閱允許您將數據廣播到多個進程,但是由于每個消息都流向每個訂閱服務器,所以無法進行縮放處理。

卡夫卡的消費群體概念概括了這兩個概念。與隊列一樣,消費者組允許您在進程集合(消費者組的成員)上劃分處理。與發(fā)布訂閱一樣,卡夫卡允許您向多個用戶組廣播消息。

卡夫卡模型的優(yōu)點是,每個主題都具有這些屬性,它可以縮放處理,并且也是多用戶,不需要選擇一個或另一個。

卡夫卡比傳統(tǒng)的消息傳遞系統(tǒng)具有更強的排序保證。

傳統(tǒng)隊列在服務器上保留記錄順序,如果多個用戶從隊列中消耗,則服務器按其存儲的順序分發(fā)記錄。然而,盡管服務器按順序分發(fā)記錄,但是記錄是異步傳送給消費者的,因此它們可能會在不同的消費者之間無序地到達。這實際上意味著在并行消耗的存在下記錄的順序丟失。消息傳遞系統(tǒng)經常圍繞著這一點而工作,它有一個“獨占消費者”的概念,它只允許一個進程從隊列中消耗,但這當然意味著在處理過程中沒有并行性。

卡夫卡做得更好。通過在主題中具有并行性分區(qū)的概念,卡夫卡能夠在消費者進程池中提供排序保證和負載平衡。這是通過將主題中的分區(qū)分配給消費者組中的消費者來實現(xiàn)的,這樣每個分區(qū)就被該組中的一個消費者完全消耗掉。通過這樣做,我們確保消費者是該分區(qū)的唯一讀取器,并按順序消耗數據。由于有許多分區(qū),這仍然平衡了許多消費者實例的負載。但是注意,消費者組中的消費者實例不能多于分區(qū)。

卡夫卡作為存儲系統(tǒng)

任何允許發(fā)布消息的消息隊列與它們之間的解耦都有效地充當了飛行消息的存儲系統(tǒng)??ǚ蚩ǖ牟煌幵谟谒且粋€非常好的存儲系統(tǒng)。

寫入卡夫卡的數據被寫入磁盤并復制用于容錯。卡夫卡允許生產商等待確認,這樣寫才被認為是完整的,直到它被完全復制,并保證即使服務器寫入失敗也會堅持。

磁盤結構卡夫卡使用規(guī)模井卡夫卡將執(zhí)行相同的,無論你有50 KB或50 TB的持久性數據在服務器上。

作為認真對待存儲并允許客戶端控制其讀取位置的結果,您可以將卡夫卡視為專用于高性能、低延遲提交日志存儲、復制和傳播的專用分布式文件系統(tǒng)。

用于流處理的卡夫卡

僅僅讀取、寫入和存儲數據流是不夠的,其目的是實現(xiàn)對流的實時處理。

在卡夫卡中,流處理器是從輸入主題獲取連續(xù)數據流的任何東西,對該輸入執(zhí)行一些處理,并產生連續(xù)的數據流到輸出主題。

例如,零售應用程序可能會接收銷售和出貨的輸入流,并輸出從該數據計算的重新排序和價格調整的流。

可以直接使用生產者和消費者API來進行簡單的處理。然而,對于更復雜的轉換,卡夫卡提供了一個完全集成的流API。這允許構建非平凡處理的應用程序,它們可以計算流中的聚合或一起加入流。

這個工具有助于解決這類應用程序面臨的難題:處理無序數據、重新處理輸入、代碼更改、執(zhí)行狀態(tài)計算等。

流API構建在卡夫卡提供的核心原語上:它使用生產者和消費者API來進行輸入,使用卡夫卡進行狀態(tài)存儲,并在流處理器實例中使用相同的組機制來容錯。

把碎片拼在一起

消息傳遞、存儲和流處理的這種組合看起來是不尋常的,但這對于卡夫卡作為流媒體平臺的角色是至關重要的。

分布式文件系統(tǒng)(如HDFS)允許存儲靜態(tài)文件以進行批量處理。實際上,這樣的系統(tǒng)允許存儲和處理過去的歷史數據。

傳統(tǒng)的企業(yè)消息系統(tǒng)允許處理訂閱后到達的未來消息。以這種方式構建的應用程序在到達時處理未來的數據。

卡夫卡結合了這兩種能力,組合對于卡夫卡作為流媒體應用平臺以及流數據管道來說都是至關重要的。

通過結合存儲和低延遲訂閱,流式應用程序可以以相同的方式處理過去和將來的數據。也就是說,單個應用程序可以處理歷史、存儲的數據,而不是在到達最后記錄時結束,它可以在將來的數據到達時保持處理。這是包含批處理和消息驅動應用程序的流處理的一般概念。

同樣,對于流式數據管道,訂閱到實時事件使得使用卡夫卡用于非常低延遲的流水線是可能的;但是,可靠地存儲數據的能力使得它可以用于關鍵數據,其中必須保證數據的傳遞或整合。n具有離線系統(tǒng),只周期性地加載數據,或者可以在長時間內進行維護。流處理設施使得在到達時轉換數據成為可能。

第二部分:Kafka技術進階

生產者概括


Kafka生產者組件圖

Kafka發(fā)送消息的主要步驟
我們從創(chuàng)建一個ProducerRecord對象開始,ProducerRecord對象需要包含目標主題和要發(fā)送的內容.我們還可以指定鍵分區(qū),在發(fā)送ProducerRecord 對象時,生產者要先把鍵和值對象序列化成字節(jié)數組,這樣他們才能在網絡上傳輸.
接下來,數據被傳給分區(qū)器,如果之前在ProducerRecord對象里指定了分區(qū),那么分區(qū)器就不會再做任何事情,直接把指定的分區(qū)返回.如果沒有指定分區(qū),那么分區(qū)器就會根據ProducerRecord對象的鍵來選擇一個分區(qū).選好分區(qū)以后,生產者就知道該往哪個主題和分區(qū)發(fā)送這條消息了.
緊接著,這條記錄被添加到一個記錄批次里,這個批次里的所有消息會被發(fā)送到相同的主題和分區(qū)上.有一個獨立的線程負責把這些記錄批次發(fā)送到相應的broker上.
服務器在收到這些消息時會返回一個響應.如果消息成功寫入Kafka,就返回一個RecordMetaData對象,它包含了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量.如果寫入失敗,則返回一個錯誤.生產者在收到錯誤之后會嘗試重新發(fā)送消息,幾次之后如果還是失敗,就返回錯誤信息.

創(chuàng)建Kafka生產者

要往Kafka寫入消息,首先要創(chuàng)建一個生產者對象,并設置一些屬性,Kafka生產者有3個必選屬性(什么意思不作說明)

bootstrap.servers
key.serializer
value.serializer
其他默認屬性:
acks
buffer.memory
compression.type
retries
batch.size
linger.ms
client.id
max.in.flight.requests.oer.connection
timeout.ms,request.timeout.ms和metadata.fetch.timeout.ms
max.block.ms
max.request.size
resive.buffer.bytes和send.buffer.bytes

下面代碼片段演示了如何創(chuàng)建一個新生產者,這里只指定了必要的屬性,其他使用默認設置

private Properties kafkaProps = new Properties(); 

kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");

kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(kafkaProps);

上述代碼片段主要意思是 先新建一個Properties對象
因為我們打算把鍵和值定義成字符串類型,所以使用內置的StringSerializer
在這里我們創(chuàng)建一個新的生產者對象,并為鍵和值設置了恰當的類型,然后把Properties對象傳給他

實例化生產者對象后,接下來就可以開始發(fā)送消息了.發(fā)送消息主要有以3種方式,

發(fā)送并忘記(fire-and-forger):我們把消息發(fā)送給服務器,但并不關心他是否正常到達.大多數情況下消息會正常到達,因為Kafaka是高可用的,而且生產者會自動嘗試重發(fā),不過使用這種方式有時候也會丟失一些消息
同步發(fā)送send():我們使用send()方式發(fā)送消息,他會返回一個Future對象,調用get()方法進行等待,就可以知道消息是否發(fā)送成功
異步發(fā)送send():我們使用send()方式發(fā)送消息,并指定一個回調函數,服務器在返回響應時調用該函數

上面的例子使用的都是單線程,但其實生產者是可以使用多線程來發(fā)送消息的剛開始的時候可以使用單個消費者和單個線程.如果需要更高的吞吐量,可以在生產者數量不變的情況下增加線程數量.如果這樣還不夠,可以增加生產者數量

發(fā)送消息到Kafka

最簡單消息發(fā)送方式如下所示

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry":"Precision Products","France");
try{
     proucer.send(record);
} catch (Exception e){
         e.printStackTrance();
}

生產者的send() 方法將ProducerRecord 對象作為參數,所以我們要先創(chuàng)建一個ProducerRecord對象.ProducerRecord有多個構造函數,這里只使用其中的一個,他需要目標主題的名字 和要發(fā)送的鍵和值對象 ,他們都是字符串.鍵和值對象的了新鮮感必須與序列化器的生產者對象相匹配

我們使用生產者的send()方法發(fā)送ProducerRecord對象.從生產者的架構圖可以看到,消息是先被放進緩沖區(qū),然后使用單獨的線程發(fā)送到服務器端.send()方法會返回一個包含RecordMetadata的Future對象,不過我們會忽略返回值,所以無法知道消息是否發(fā)送成功,如果不關心發(fā)送結果,那么可以使用這種發(fā)送方式.比如 記錄Twitter消息日志,或記錄不太重要的應用程序日志

同步發(fā)送

最簡單的同步發(fā)送消息方式如下所示

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry","Precision Products","France");

try{
  producer.send(record).get();
} catch (Exception e){
       e.printStackTrance();
}

在這里 producer.send() 方法先返回一個Future對象,然后調用Future對象的get()方法等待Kafka響應,如果服務器返回錯誤,get()方法會拋異常.如果沒有發(fā)生錯誤,我們會得到一個RecordMetadata對象 可以用他來獲取消息的偏移量
如果在發(fā)送數據之前或在發(fā)送過程中發(fā)生了任何錯誤.比如broker返回一個不允許重發(fā)消息的異?;蛘咭呀洺^了重發(fā)的次數,那么就會拋出異常.我們只是簡單的把異常信息打印出來

異步發(fā)送

假設消息在應用程序和Kafka集群之間一個來回需要10ms. 如果在發(fā)送完每個消息后都等待回應,那么發(fā)送100個消息需要1秒 但如果只發(fā)送消息而不等待響應,那么發(fā)送100個消息所需要的時間會少很多.大多數時候,我們并不需要等待響應,盡管Kafka會把目標主題,分區(qū)信息和消息的偏移量發(fā)送回來,但對于發(fā)送端的應用程序來說不是必需的.不過在遇到消息發(fā)送失敗時,我們需要拋出異常,記錄錯誤日志,或把消息寫入錯誤消息文件

為了在異步發(fā)送消息的同時能夠對異常情況進行處理,生產者提供了回調支持,下面是使用回調的一個例子

private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata,Exception e){
if (e != null){
 e.prinStackTrace();
    }
  }
}
ProducerRecord<String, String> record =  new ProducerRecord<>("CustomerCountry","Biomedical Materials","USA");

producer.send(record,new DemoProducerCallback());

為了使用回調,需要實現(xiàn)org.apach.kafka.clients.producer.Callback接口的類,這個接口只有一個onCompletion方法
如果 Kafka返回一個錯誤 onCompletion方法會拋一個非空(non null)異常
在發(fā)送消息時傳進去一個回調對象

消費者(KafkaConsumer)

在了解如何從Kafaka讀取消息之前,我們先先了解一下消費者和消費者群組的概念
假設我們有一個應用程序要從一個Kafka主題讀取消息并驗證這些消息,然后把他們存儲起來,應用程序需要創(chuàng)建一個消費者對象,訂閱主題并開始接收消息,然后驗證消息并保存結果,過了一陣子 生產者讓主題寫入消息的速度超過了應用程序驗證數據的速度,這個時候該怎么辦?如果只使用單個消費者處理消息,應用程序永遠跟不上消息的生成速度,這個時候就需要像多個生產者可以向相同的主題寫入消息一樣,我們也需要使用多個消費者從同一個主題讀取消息,對消息進行分流.Kafka 消費者從屬于消費者群組,一個群組里的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區(qū)的消息,分區(qū)的一個主題消息會被不同消費組訂閱,一個消息只能被每個消費者群組中的一個消費者接收.

Kafka消費者與消費者群組
消費者群組和分區(qū)再均衡

群組里的消費者共同讀取主題的分區(qū),一個新的消費者加入群組時,他讀取的是原本由其他消費者讀取的消息.當一個消費者被關閉或者發(fā)生崩潰時,他就離開群組,原本由他讀取的分區(qū)將有群組里的其他消費者來讀取,在主題發(fā)生變化時,比如管理員添加新的分區(qū),會發(fā)生分區(qū)重分配.分區(qū)的所有權從一個消費者轉移到另外一個消費者這樣的行為稱作再均衡
程序如何觸發(fā)再均衡?
消費者通過向群組協(xié)調器的broker發(fā)送心跳來維持他們和群組的從屬關系以及他們對分區(qū)的所有權關系,只要消費者可以以正常的的時間間隔發(fā)送心跳就被認為是活躍的說明他還在讀取分區(qū)的消息.消費者會在輪詢消息或者提交偏移量時發(fā)送心跳.如果消費者停止發(fā)送心跳的時間足夠長,會話過期,群組協(xié)調器認為他已經死了,就會觸發(fā)一次再均衡.

創(chuàng)建Kafka消費者
{
   Properties props = new Properties();
   props.put("bootstrap.servers","broker1:9092,broker2:9092");
   props.put("group.id","CountryCounter");
   props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
   props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
}
訂閱主題
consumer.subscribe(Collection.singletonList("customerCountries"));//主題名"customerCountries"
消費者配置

fetch.min.bytes
fetch.max.wait.ms
max.partition.fetch.bytes
session.timeout.ms
auto.offset.reset
enable.auto.commit
partition.assignment.strategy
client.id
max.poll.records
receive.buffer.bytes/send.buffer.bytes

到這里Kafka的基礎內容已經介紹完了 ,如果想深入了解這些是遠遠不夠的 在這里可以推薦幾本書給大家
如果想對Kafka的整體有深刻的認識可以讀<<Kafka權威指南>>必讀
其次就是 <<Kafka技術內幕>> 這兩本書讀完 幾本就OK了
最后也可以讀<<Apache Kafka 源碼剖析>>不建議讀
當然官網讀英文文檔最好了

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 關于Mongodb的全面總結 MongoDB的內部構造《MongoDB The Definitive Guide》...
    中v中閱讀 32,302評論 2 89
  • 我想拾起記憶中的碎片, 拼湊出一幅天真的畫面。 皎潔的夜空星光璀璨, 小伙伴們躲在葡萄架下; 靜聽牛郎織女伴著仙樂...
    SpringTang閱讀 650評論 0 0
  • 新夢想演講與口才特訓班心得體會 上周末我參加了由新夢想組織兩天一夜的演講口才培訓,感觸頗多、受益匪淺??偨Y起...
    蔡暢_a8cd閱讀 368評論 1 3
  • 王愛麗 焦點網絡中級五期 洛陽18—3—21持續(xù)分享第303天(春雪原創(chuàng)分享第437天) 昨天下午上完課,看到了一...
    春雪ly閱讀 488評論 0 1

友情鏈接更多精彩內容