siddhi事件封裝存入Kafka

1、項目依賴:

 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <siddhi.version>5.1.11</siddhi.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-query-api</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.61</version>
        </dependency>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-core</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>io.siddhi</groupId>
            <artifactId>siddhi-query-compiler</artifactId>
            <version>${siddhi.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
            <version>4.12</version>
        </dependency>
    </dependencies>

2、示列項目:

import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;
public class HelloWorld {
    public static void main(String[] args) throws InterruptedException {
        SiddhiManager siddhiManager = new SiddhiManager();
        String siddhiApp =
                "define stream cseEventStream (id string,symbol string, price float, volume int,time long); " +
                        "from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count  group by id having count>=2 insert into outputStream;"
                        + "from cseEventStream#window.externalTimeBatch(time,60 sec) select volume,symbol,count(id) as count  group by id having count>=3 insert into outputStream;";
        ;
        // Generating runtime
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

        InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
        siddhiAppRuntime.start();
        siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    System.out.println(event);
                }
            }
        });
        // Sending events to Siddhi
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 800, 1575011611229L}); //11-29 15:13:31
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 100, 1575011612229L});//11-29 15:13:32
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575015212000L});//11-29 16:00:12
        inputHandler.send(new Object[]{"3", "Welcome3", 700f, 900, 1575111612330L});//11-30 19:00:12

        inputHandler.send(new Object[]{"4", "to", 50f, 30, 1575011639158L});
        inputHandler.send(new Object[]{"5", "IBM", 76.6f, 400, 1575011639158L});
        inputHandler.send(new Object[]{"6", "siddhi!", 45.6f, 200, 1575011639158L});

        siddhiAppRuntime.shutdown();
        siddhiManager.shutdown();
    }
}

3、在回調(diào)函數(shù)中將event封裝成固定對象輸出到Kafka。此處需要修改addCallback中的函數(shù)。著急的可以直接先看第四點后記,防止向我一樣繞彎。

封裝的對象Result:

public class Result implements Serializable {
    private int volume;
    private String symbol;
    private int count;

    public Result(Event event) {
        this.volume = (int) event.getData(0);
        this.symbol = (String) event.getData(1);
        this.count = (int) event.getData(2);
    }
     public Result() {
    }

    public int getVolume() {
        return volume;
    }

    public void setVolume(int volume) {
        this.volume = volume;
    }

    public String getSymbol() {
        return symbol;
    }

    public void setSymbol(String symbol) {
        this.symbol = symbol;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }
  @Override
    public String toString() {
        return "Result{" +
                "volume=" + volume +
                ", symbol='" + symbol + '\'' +
                ", count=" + count +
                '}';
    }
}

在測試時將addCallback改成

siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    Result result = new Result(event);
                    System.out.println(result.toString());
                }
            }
        });

本來想著直接new一個Result對象還比較快。不用一個個去set。結(jié)果什么也沒有輸出,也沒有報錯。很納悶。
然后之好試試set。代碼改成:

  siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    Result result = new Result();
                    result.setVolume((int) event.getData(0));
                    result.setSymbol((String) event.getData(1));
                    result.setCount((int) event.getData(2));
                    System.out.println(result.toString());
                }
            }
        });

結(jié)果還是沒有輸出,這就很奇怪了。

測試了很久發(fā)現(xiàn)。這個回調(diào)函數(shù)的even類型很嚴格,long類型不能通過加(int)設(shè)置成int。必須通過 Integer.parseInt(event.getData(2).toString())這樣才行啊。累死終于找到問題了。

最后轉(zhuǎn)化為json寫入Kafka即可。參考代碼:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaSink {
    private static Producer<String, String> producer = new KafkaProducer<String, String>(getProperties());

    public static Properties getProperties() {
        Properties properties = new Properties();
        properties.put("retries", 3);
        properties.put("acks", "0");
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //值為字符串類型
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    public static void send(String data) throws ExecutionException, InterruptedException {
        producer.send(new ProducerRecord<>("test", data)).get();
    }
}



===========================
回調(diào)函數(shù)改成:
  siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    Result result = new Result(event);
                    try {
                        KafkaSink.send(JSON.toJSONString(result));
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

4、后記:

后來發(fā)現(xiàn)回調(diào)類里面有轉(zhuǎn)換為map的函數(shù)(toMap),map值的類型也不會亂,可以直接轉(zhuǎn)換為map后轉(zhuǎn)化為json。如下:

 siddhiAppRuntime.addCallback("outputStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    System.out.println(JSON.toJSONString(toMap(event)));
                }
            }
        });
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容