一文搞懂 FlinkSQL函數(shù) LAST_VALUE 的原理

背景

剛開始接觸 FlinkSQL 時,對 LAST_VALUE 特別好奇,雖然工作當中有在用到,但還是特別的想知道它是怎么實現(xiàn)的,今天終于可以總結(jié)一下

原理

當我們寫入如下類似的 sql 時,就會用到 LAST_VALUE 函數(shù)

select LAST_VALUE(status) from temp;

LAST_VALUE 函數(shù)對應(yīng)的具體類為 LastValueWithRetractAggFunction。
LAST_VALUE函數(shù)之所以能夠起作用最關(guān)鍵的是

 /** Accumulator for LAST_VALUE with retraction. */
    public static class LastValueWithRetractAccumulator<T> {
        public T lastValue = null;
        public Long lastOrder = null;
        // value timestamp
        public MapView<T, List<Long>> valueToOrderMap = new MapView<>();
        // timestamp value
        public MapView<Long, List<T>> orderToValueMap = new MapView<>();

       ......
    }

    @SuppressWarnings("unchecked")
    public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception {
        if (value != null) {//傳進來的是 null 不做任何操作
            T v = (T) value;
            Long order = System.currentTimeMillis();
            List<Long> orderList = acc.valueToOrderMap.get(v);
            if (orderList == null) {
                orderList = new ArrayList<>();
            }
            orderList.add(order);
            acc.valueToOrderMap.put(v, orderList);
            accumulate(acc, value, order);
        }
    }

    @SuppressWarnings("unchecked")
    public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value, Long order)
            throws Exception {
        if (value != null) {
            T v = (T) value;
            Long prevOrder = acc.lastOrder;// 默認是 null
            if (prevOrder == null || prevOrder <= order) {//類似鏈表頭插法
                acc.lastValue = v;
                acc.lastOrder = order;
            }

            List<T> valueList = acc.orderToValueMap.get(order);
            if (valueList == null) {
                valueList = new ArrayList<>();
            }
            valueList.add(v);
            acc.orderToValueMap.put(order, valueList);
        }
    }

    @SuppressWarnings("unchecked")
    public void retract(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception {
        if (value != null) {
            T v = (T) value;
            List<Long> orderList = acc.valueToOrderMap.get(v);// 查出所有的 timestamp
            if (orderList != null && orderList.size() > 0) {// 說明之前已經(jīng)發(fā)出過了.此刻該 retract
                Long order = orderList.get(0);
                orderList.remove(0);//最早進入的那個 value 對應(yīng)的 timestamp remove
                if (orderList.isEmpty()) {//說明該 value 有且僅進入了一次
                    acc.valueToOrderMap.remove(v);
                } else {
                    acc.valueToOrderMap.put(v, orderList);
                }
                retract(acc, value, order);
            }
        }
    }

    @SuppressWarnings("unchecked")
    public void retract(LastValueWithRetractAccumulator<T> acc, Object value, Long order)
            throws Exception {
        if (value != null) {
            T v = (T) value;
            List<T> valueList = acc.orderToValueMap.get(order);//取出相同 timestamp 對應(yīng)的所有 value
            if (valueList == null) {
                return;
            }
            int index = valueList.indexOf(v);// 找到對應(yīng)的 value 并將其刪除
            if (index >= 0) {
                valueList.remove(index);
                if (valueList.isEmpty()) {
                    acc.orderToValueMap.remove(order);
                } else {
                    acc.orderToValueMap.put(order, valueList);
                }
            }
            if (v.equals(acc.lastValue)) {
                Long startKey = acc.lastOrder;
                Iterator<Long> iter = acc.orderToValueMap.keys().iterator();
                // find the maximal order which is less than or equal to `startKey`
                //找到小于要刪除值對應(yīng)時間戳的最大值
                Long nextKey = Long.MIN_VALUE;
                while (iter.hasNext()) {
                    Long key = iter.next();
                    if (key <= startKey && key > nextKey) {
                        nextKey = key;
                    }
                }

                if (nextKey != Long.MIN_VALUE) {
                    List<T> values = acc.orderToValueMap.get(nextKey);
                    acc.lastValue = values.get(values.size() - 1);
                    acc.lastOrder = nextKey;
                } else {
                    acc.lastValue = null;
                    acc.lastOrder = null;
                }
            }
        }
    }

首先呢是兩個 MapView valueToOrderMap、orderToValueMap

valueToOrderMap 值( 此刻最終的結(jié)果 )---->消息進入accumulate 方法的系統(tǒng)時間戳
orderToValueMap 消息進入accumulate 方法的系統(tǒng)時間戳 ----->值( 此刻最終的結(jié)果 )

當 RowData( 內(nèi)部使用 )對應(yīng)的 rowKind 為 insert 或者 update_after 時,會進入 accumulate(LastValueWithRetractAccumulator<T> acc, Object value) 方法。accumulate 方法相對比較簡單其實就是分別對 valueToOrderMap、orderToValueMap 進行賦值。

當 RowData( 內(nèi)部使用 )對應(yīng)的 rowKind 為 delete 或者 update_before 時,會進入 retract(LastValueWithRetractAccumulator<T> acc, Object value) 方法,主要是操作 valueToOrderMap 刪除之前已經(jīng)發(fā)出去的消息記錄,然后進入 retract(LastValueWithRetractAccumulator<T> acc, Object value, Long order),主要就是操作 orderToValueMap 刪除對應(yīng)時間戳的值,然后找出 不大于要刪除數(shù)據(jù)對應(yīng)時間戳的最大時間戳,下一步要 retract 就該它了

總結(jié)

其實就是通過 時間戳 來進行判斷的

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容