Kafka Stream簡單示例(四)---定義更通用的Serde

本篇是在《Kafka Stream簡單示例(一)》《Kafka Stream簡單示例(二)---聚合 Aggregation--統(tǒng)計總和》 以及《 Kafka Stream簡單示例(三)---自定義Serde》基礎(chǔ)上成文的,建議先閱讀前三篇,以便清楚上下文關(guān)系需求背景。

第三篇 《Kafka Stream簡單示例(三)---自定義Serde》中,我們自定義了Statistic類的Serializer和Deserializer?,F(xiàn)實中我們可能需要多個類都支持序列化和反序列,能否有泛型的Serializer和Deserializer,直接放入自己的類就可以完成工作?答案是,如果你的類,是POJO類型的,使用泛型JsonPOJOSerializer和JsonPOJODeserializer就可以。

注意:示例中的代碼只是展示流程,非生產(chǎn)代碼,僅供參考,由此導(dǎo)致的問題本人概不負(fù)責(zé)。

官方文檔在這里,我用是kafka 1.0. 所以連接也是1.0版本的文檔。 http://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html

項目需求

統(tǒng)計一分鐘內(nèi)(固定時間窗口Tumbling Window)內(nèi)溫度的總和與平均值。類似的還有,最大值,最小值。

主要流程和代碼

完整的代碼在這里,歡迎加星和fork。 謝謝!

一個結(jié)果中必須同時含有總和與平均值,于是我們設(shè)計一個簡單數(shù)據(jù)結(jié)構(gòu)

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Statistics {
    private Long avg;
    private Long sum;
    private Long count;
}

細(xì)心的人會發(fā)現(xiàn)本篇中的Statistic比《 Kafka Stream簡單示例(三)---自定義Serde》中的Statistic多一個@NoArgsConstructor注解,這是因為我們后面使用反序列化是需要生成Statistics對象(使用默認(rèn)的無參構(gòu)造函數(shù)生成)。 因此需要添加@NoArgsConstructor注解。

根據(jù)Serdes的要求,我們必須提供對應(yīng)的Serializer和Deserializer。


SerdesClass.png

我們需要實現(xiàn)JsonPOJOSerializer和JsonPOJODeserializer。仍然才考LongSerializer和LongDeserializer的實現(xiàn), 我們實現(xiàn)了StatisticsSerializer和StatisticsDeserializer。
首先是序列化實現(xiàn)JsonPOJOSerializer

package com.yq.generic;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Map;

/**
 * 這個是官方例子的copy, 版權(quán)歸官方。copy到本地是為了讓我的例子也運(yùn)行起來
 * https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
 * className: JsonPOJOSerializer
 *
 */


public class JsonPOJOSerializer<T> implements Serializer<T> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    /**
     * Default constructor needed by Kafka
     */
    public JsonPOJOSerializer() {
    }

    @Override
    public void configure(Map<String, ?> props, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null)
            return null;

        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error serializing JSON message", e);
        }
    }

    @Override
    public void close() {
    }

}


其次是反序列化實現(xiàn)JsonPOJODeserializer

package com.yq.generic;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Map;

/**
 * 這個是官方例子的copy, 版權(quán)歸官方。copy到本地是為了讓我的例子也運(yùn)行起來
 * https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
 * className: JsonPOJOSerializer
 *
 */
public class JsonPOJODeserializer<T> implements Deserializer<T> {
    private ObjectMapper objectMapper = new ObjectMapper();

    private Class<T> tClass;

    /**
     * Default constructor needed by Kafka
     */
    public JsonPOJODeserializer() {
    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> props, boolean isKey) {
        tClass = (Class<T>) props.get("JsonPOJOClass");
    }

    @Override
    public T deserialize(String topic, byte[] bytes) {
        if (bytes == null)
            return null;

        T data;
        try {
            data = objectMapper.readValue(bytes, tClass);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return data;
    }

    @Override
    public void close() {

    }
}


最后是我們的主流程。
第一步,我們需要先定義

final Serializer<Statistics> statisticsSerializer = new JsonPOJOSerializer<>();
        serdeProps.put("JsonPOJOClass", Statistics.class);
        statisticsSerializer.configure(serdeProps, false);

        final Deserializer<Statistics> statisticsDeserializer = new JsonPOJODeserializer<>();
        serdeProps.put("JsonPOJOClass", Statistics.class);
        statisticsDeserializer.configure(serdeProps, false);

        final Serde<Statistics> statisticsSerde = Serdes.serdeFrom(statisticsSerializer, statisticsDeserializer);

第二步。 就像SerDe內(nèi)置的Serdes.Long()或者 Serdes.String(), 可以直接使用statisticsSerde。

KTable的格式是 KTable<Windowed<String>, Statistics>。 aggregate函數(shù)的初始值和返回都是Statistics類型, 結(jié)果存儲的格式Materialized.<String, Statistics, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-temp-stream-store")
.withValueSerde(statisticsSerde) , 也是Statistics類型。

package com.yq.generic;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yq.customized.Statistics;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.WindowedDeserializer;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.state.WindowStore;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 *  http://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html
 * 統(tǒng)計60秒內(nèi),溫度值的最大值  topic中的消息格式為數(shù)字,30, 21或者{"temp":19, "humidity": 25}
 */
public class TemperatureAvgGenericSerDeDemo {
    private static final int TEMPERATURE_WINDOW_SIZE = 60;

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temp-avg");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("iot-temp");

        Map<String, Object> serdeProps = new HashMap<>();

        final Serializer<Statistics> statisticsSerializer = new JsonPOJOSerializer<>();
        serdeProps.put("JsonPOJOClass", Statistics.class);
        statisticsSerializer.configure(serdeProps, false);

        final Deserializer<Statistics> statisticsDeserializer = new JsonPOJODeserializer<>();
        serdeProps.put("JsonPOJOClass", Statistics.class);
        statisticsDeserializer.configure(serdeProps, false);

        final Serde<Statistics> statisticsSerde = Serdes.serdeFrom(statisticsSerializer, statisticsDeserializer);

        KTable<Windowed<String>, Statistics> max = source
                .selectKey(new KeyValueMapper<String, String, String>() {
                    @Override
                    public String apply(String key, String value) {
                        return "stat";
                    }
                })
                .groupByKey()
                .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
                .aggregate(
                        new Initializer<Statistics>() {
                            @Override
                            public Statistics apply() {
                                Statistics avgAndSum = new Statistics(0L,0L,0L);
                                return avgAndSum;
                            }
                        },
                        new Aggregator<String, String, Statistics>() {
                            @Override
                            public Statistics apply(String aggKey, String newValue, Statistics aggValue) {
                                //topic中的消息格式為{"temp":19, "humidity": 25}
                                System.out.println("aggKey:" + aggKey + ",  newValue:" + newValue + ", aggKey:" + aggValue);
                                Long newValueLong = null;
                                try {
                                    JSONObject json = JSON.parseObject(newValue);
                                    newValueLong = json.getLong("temp");
                                }
                                catch (ClassCastException ex) {
                                    try {
                                        newValueLong = Long.valueOf(newValue);
                                    }
                                     catch (NumberFormatException e) {
                                         System.out.println("Exception:" + e.getMessage());
                                         //異常返回原值
                                         return aggValue;
                                    }
                                }
                                catch (Exception e) {
                                    System.out.println("Exception:" + e.getMessage());
                                    //異常返回原值
                                    return aggValue;
                                }


                                aggValue.setCount(aggValue.getCount() + 1);
                                aggValue.setSum(aggValue.getSum() + newValueLong);
                                aggValue.setAvg(aggValue.getSum() / aggValue.getCount());

                                return aggValue;
                            }
                        },
                        Materialized.<String, Statistics, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-temp-stream-store")
                                .withValueSerde(statisticsSerde)
                );

        WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
        WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
        Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);

        max.toStream().to("iot-temp-stat", Produced.with(windowedSerde, statisticsSerde));

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);


        Runtime.getRuntime().addShutdownHook(new Thread("streams-temperature-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

效果截圖

圖中已經(jīng)有文字說明,結(jié)合代碼能更清楚了解Kafka Stream。


GenericPOJOSer.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 他是一個四川小伙,出生于1987年,目前已安家在浙江臺州。他與妻子共同經(jīng)營一家生產(chǎn)戶外用品的企業(yè),廠里有工人30人...
    從心處發(fā)閱讀 226評論 1 2
  • 只要我喜歡這女孩,就沒有什么婆媳矛盾。報紙上曾看到一個阿姨替兒子找對象時對記者說了這樣的話。一語道破婆媳不好相處的...
    小雛菊_0af6閱讀 233評論 0 0
  • 我們是在我本科暑假支教隊里認(rèn)識的,大家都不是最初在隊里熟悉起來的伙伴,后來卻意外的投緣,三個人有了小團(tuán)體和友誼。一...
    番茄和圣女果的關(guān)系閱讀 292評論 0 0
  • 很多時候需要簡單的限制一下文本輸入框的文本長度.寫多了類似的代碼之后,想把它封裝一下一勞永逸.封裝后將一行代碼搞定...
    Fsn_soul閱讀 481評論 0 0

友情鏈接更多精彩內(nèi)容