背景
剛開始接觸 FlinkSQL 時,對 LAST_VALUE 特別好奇,雖然工作當中有在用到,但還是特別的想知道它是怎么實現(xiàn)的,今天終于可以總結(jié)一下
原理
當我們寫入如下類似的 sql 時,就會用到 LAST_VALUE 函數(shù)
select LAST_VALUE(status) from temp;
LAST_VALUE 函數(shù)對應(yīng)的具體類為 LastValueWithRetractAggFunction。
LAST_VALUE函數(shù)之所以能夠起作用最關(guān)鍵的是
/** Accumulator for LAST_VALUE with retraction. */
public static class LastValueWithRetractAccumulator<T> {
public T lastValue = null;
public Long lastOrder = null;
// value timestamp
public MapView<T, List<Long>> valueToOrderMap = new MapView<>();
// timestamp value
public MapView<Long, List<T>> orderToValueMap = new MapView<>();
......
}
@SuppressWarnings("unchecked")
public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception {
if (value != null) {//傳進來的是 null 不做任何操作
T v = (T) value;
Long order = System.currentTimeMillis();
List<Long> orderList = acc.valueToOrderMap.get(v);
if (orderList == null) {
orderList = new ArrayList<>();
}
orderList.add(order);
acc.valueToOrderMap.put(v, orderList);
accumulate(acc, value, order);
}
}
@SuppressWarnings("unchecked")
public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value, Long order)
throws Exception {
if (value != null) {
T v = (T) value;
Long prevOrder = acc.lastOrder;// 默認是 null
if (prevOrder == null || prevOrder <= order) {//類似鏈表頭插法
acc.lastValue = v;
acc.lastOrder = order;
}
List<T> valueList = acc.orderToValueMap.get(order);
if (valueList == null) {
valueList = new ArrayList<>();
}
valueList.add(v);
acc.orderToValueMap.put(order, valueList);
}
}
@SuppressWarnings("unchecked")
public void retract(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception {
if (value != null) {
T v = (T) value;
List<Long> orderList = acc.valueToOrderMap.get(v);// 查出所有的 timestamp
if (orderList != null && orderList.size() > 0) {// 說明之前已經(jīng)發(fā)出過了.此刻該 retract
Long order = orderList.get(0);
orderList.remove(0);//最早進入的那個 value 對應(yīng)的 timestamp remove
if (orderList.isEmpty()) {//說明該 value 有且僅進入了一次
acc.valueToOrderMap.remove(v);
} else {
acc.valueToOrderMap.put(v, orderList);
}
retract(acc, value, order);
}
}
}
@SuppressWarnings("unchecked")
public void retract(LastValueWithRetractAccumulator<T> acc, Object value, Long order)
throws Exception {
if (value != null) {
T v = (T) value;
List<T> valueList = acc.orderToValueMap.get(order);//取出相同 timestamp 對應(yīng)的所有 value
if (valueList == null) {
return;
}
int index = valueList.indexOf(v);// 找到對應(yīng)的 value 并將其刪除
if (index >= 0) {
valueList.remove(index);
if (valueList.isEmpty()) {
acc.orderToValueMap.remove(order);
} else {
acc.orderToValueMap.put(order, valueList);
}
}
if (v.equals(acc.lastValue)) {
Long startKey = acc.lastOrder;
Iterator<Long> iter = acc.orderToValueMap.keys().iterator();
// find the maximal order which is less than or equal to `startKey`
//找到小于要刪除值對應(yīng)時間戳的最大值
Long nextKey = Long.MIN_VALUE;
while (iter.hasNext()) {
Long key = iter.next();
if (key <= startKey && key > nextKey) {
nextKey = key;
}
}
if (nextKey != Long.MIN_VALUE) {
List<T> values = acc.orderToValueMap.get(nextKey);
acc.lastValue = values.get(values.size() - 1);
acc.lastOrder = nextKey;
} else {
acc.lastValue = null;
acc.lastOrder = null;
}
}
}
}
首先呢是兩個 MapView valueToOrderMap、orderToValueMap
valueToOrderMap 值( 此刻最終的結(jié)果 )---->消息進入accumulate 方法的系統(tǒng)時間戳
orderToValueMap 消息進入accumulate 方法的系統(tǒng)時間戳 ----->值( 此刻最終的結(jié)果 )
當 RowData( 內(nèi)部使用 )對應(yīng)的 rowKind 為 insert 或者 update_after 時,會進入 accumulate(LastValueWithRetractAccumulator<T> acc, Object value) 方法。accumulate 方法相對比較簡單其實就是分別對 valueToOrderMap、orderToValueMap 進行賦值。
當 RowData( 內(nèi)部使用 )對應(yīng)的 rowKind 為 delete 或者 update_before 時,會進入 retract(LastValueWithRetractAccumulator<T> acc, Object value) 方法,主要是操作 valueToOrderMap 刪除之前已經(jīng)發(fā)出去的消息記錄,然后進入 retract(LastValueWithRetractAccumulator<T> acc, Object value, Long order),主要就是操作 orderToValueMap 刪除對應(yīng)時間戳的值,然后找出 不大于要刪除數(shù)據(jù)對應(yīng)時間戳的最大時間戳,下一步要 retract 就該它了
總結(jié)
其實就是通過 時間戳 來進行判斷的