Data Pipelines & ETL

Flink一個最常見的use case就是做ETL。

1. Stateless Transformation

無狀態(tài)的轉(zhuǎn)換最基礎的操作就是map和flatMap.
map操作執(zhí)行的是一對一的轉(zhuǎn)換,即對于每個stream中的元素都會輸出一個轉(zhuǎn)換后的元素。

public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {

    @Override
    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
        return new EnrichedRide(taxiRide);
    }
}

而flatMap通過一個Collector接收輸出,所以輸出的元素數(shù)量可以與輸入的不一致。

public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {

    @Override
    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
        if (valid.filter(taxiRide)) {
            out.collect(new EnrichedRide(taxiRide));
        }
    }
}

2. Keyed Streams

  • KeyBy()
    根據(jù)元素的某個屬性進行分區(qū),就像group by一樣,通常這會導致昂貴的網(wǎng)絡交換,序列化以及反序列化
  • Keys are computed
    也可以將多個屬性的計算結果作為key, 但為了在需要的時候重新計算key要保證每次計算的結果都是相同的
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat));
  • Aggregations on Keyed Streams
import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

        @Override
        public void flatMap(EnrichedRide ride,
                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
            if (!ride.isStart) {
                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
                Minutes duration = rideInterval.toDuration().toStandardMinutes();
                out.collect(new Tuple2<>(ride.startCell, duration));
            }
        }
    });

minutesByStartCell
  .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
  .maxBy(1) // duration
  .print();

先將stream按照startCell分組,再對每組partition做聚合運算。上例會實時更新每個startCell的max duration

  • Implicit State
    在上例中程序維護了一個implicit的state, 即每個key的max duration.在這個例子中state很簡單,但在實際生產(chǎn)中,我們最好一個時間窗口內(nèi)保存state,而非在整個stream中。以避免state過大。

3. Stateful Transformations

  • Rich Functions
    rich functions, 如RichFlatMapFunction,包含了額外的方法,如:
    open(Configuration c): 只在operator初始化時調(diào)用一次,可以用來加載靜態(tài)數(shù)據(jù),或建立與外部服務的連接
    close():
    getRuntimeContext(): 可以創(chuàng)建或獲取由Flink管理的state

  • Keyed State的例子

private static class Event {
    public final String key;
    public final long timestamp;
    ...
}

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  
    env.addSource(new EventSource())
        .keyBy(e -> e.key)
        .flatMap(new Deduplicator())
        .print();
  
    env.execute();
}

public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
    ValueState<Boolean> keyHasBeenSeen;

    @Override
    public void open(Configuration conf) {
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
        keyHasBeenSeen = getRuntimeContext().getState(desc);
    }

    @Override
    public void flatMap(Event event, Collector<Event> out) throws Exception {
        if (keyHasBeenSeen.value() == null) {
            out.collect(event);
            keyHasBeenSeen.update(true);
        }
    }
}

Flink支持多種類型的keyed state, 本例中使用的是最簡單的valueState. 對于每一個key, flink會維護一個對象。程序剛啟動時,調(diào)用open()方法,還沒有event, 也就沒有key. 后面event出現(xiàn)調(diào)用flatMap時,可以獲取到key,就可以用于在flink的state后端做判斷。
部署到分布式集群上時,會有很多個Deduplicator 實例,每一個都只對整個keyspace上互不相關的state負責,因此當你看見一個valueState時,要明白這不止代表一個Boolean對象,而是一個分布式的共享的key-value store.

  • Clearing State
    如果例子中的key是無界的,我們就需要手動清理state, 這通過clear()方法實現(xiàn)
keyHasBeenSeen.clear();

你可以指定一個Timer來執(zhí)行這個操作,或者指定valueState的Time-To-Live參數(shù)

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
  • Non-keyed State
    有時候我們也會需要管理non-keyed的state, 這通常稱作operator state

4. Connected Stream

一個operator可以有兩個及以上的source, 其中一個是data, 另一個可以是rules, thresholds或者其他參數(shù)等。也可以用作Streaming joins.



要注意的是兩個連接在一起的stream必須要有兼容的key.

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env
        .fromElements("DROP", "IGNORE")
        .keyBy(x -> x);

    DataStream<String> streamOfWords = env
        .fromElements("Apache", "DROP", "Flink", "IGNORE")
        .keyBy(x -> x);
  
    control
        .connect(streamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private ValueState<Boolean> blocked;
      
    @Override
    public void open(Configuration config) {
        blocked = getRuntimeContext()
            .getState(new ValueStateDescriptor<>("blocked", Boolean.class));
    }
      
    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        blocked.update(Boolean.TRUE);
    }
      
    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (blocked.value() == null) {
            out.collect(data_value);
        }
    }
}

對element執(zhí)行flatMap1還是flatMap2是根據(jù)兩個stream connect的順序決定的。本例中control.connect(streamOfWords) 所以connect走flatMap1,dataStream走flatMap2. 但是你是沒有辦法控制flatMap1和flatMap2執(zhí)行的順序的,因為兩個stream是競爭的關系,完全由Flink運行時決定的。所以如果順序或者執(zhí)行時間很重要的情境下,最好先將events緩存在flink state中,或者通過InputSelectable 接口指定執(zhí)行的順序。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容