簡介
只要實現(xiàn) SourceFunction 接口對應的方法就可以自定義數(shù)據(jù)源
1.創(chuàng)建環(huán)境
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> streamSource = env.addSource(new MySensorSource());
streamSource.print();
env.execute();
}
2.實現(xiàn) SourceFunction 接口
public static class MySensorSource implements SourceFunction<SensorReading> {
//定義一個標識位用來控制數(shù)據(jù)
private boolean running = true;
//定義一個隨機數(shù)發(fā)生器
Random random = new Random();
public void run(SourceContext<SensorReading> ctx) throws Exception {
//設(shè)置10個傳感器的初識溫度
HashMap<String, Double> sensorTempMap = new HashMap<String, Double>();
for (int i = 0; i < 10; i++) {
sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
}
while (running) {
for(String sensorId : sensorTempMap.keySet()){
//在當前的溫度基礎(chǔ)上隨機波動
Double newtemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId,newtemp);
ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newtemp));
}
//控制輸出頻率
Thread.sleep(1000L);
}
}
public void cancel() {
running = false;
}
}