- CEP和狀態(tài)機(jī)
- 狀態(tài)機(jī)的表示和如何作用在流上
Flink中CEP的一般代碼結(jié)構(gòu)如下:
val input = env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(2, "start", 2.0),
new Event(3, "foobar", 3.0),
new SubEvent(4, "foo", 4.0, 1.0),
new Event(5, "middle", 5.0),
new SubEvent(6, "middle", 6.0, 2.0),
new SubEvent(7, "bar", 3.0, 3.0),
new Event(42, "42", 42.0),
new Event(8, "end", 1.0)
)
val pattern: Pattern[Event, Event] = Pattern.begin[Event]("start")
.where(new SimpleCondition[Event] {
override def filter(e: Event): Boolean = {
e.name.equals("start")
}
})
.followedByAny("middle").subtype[SubEvent](classOf[SubEvent])
.where(new SimpleCondition[SubEvent] {
override def filter(e: SubEvent): Boolean = {
e.name.equals("middle")
}
})
.followedByAny("end")
.where(new SimpleCondition[Event] {
override def filter(e: Event): Boolean = {
e.name.equals("end")
}
})
val patternStream = CEP.pattern(input, pattern)
val result = patternStream.process(
new PatternProcessFunction[Event, String] {
// 此處因?yàn)閿?shù)據(jù)放在一個map里面了, 喪失了先后順序需要特別注意
override def processMatch(matchResult: util.Map[String, util.List[Event]],
ctx: PatternProcessFunction.Context, out: Collector[String]): Unit = {
val info = matchResult.asScala.map{ case (k, v) =>
(k, v.asScala.mkString(","))
}.mkString(";")
out.collect(info)
}
}
)
result.print()
env.execute("cep demo")
從上面可以看出入口是
- 一個一般的
DataStream - 然后進(jìn)過一個
Pattern, 得到一個PatternStream, - 最后再通過調(diào)用
PatternStream#process又變成一個一般的DataStream
1. PatternStream#process
現(xiàn)在我們具體看下process 到底做了什么
public <R> SingleOutputStreamOperator<R> process(
final PatternProcessFunction<T, R> patternProcessFunction,
final TypeInformation<R> outTypeInfo) {
return builder.build(
outTypeInfo,
builder.clean(patternProcessFunction));
}
<OUT, K> SingleOutputStreamOperator<OUT> build(
final TypeInformation<OUT> outTypeInfo,
final PatternProcessFunction<IN, OUT> processFunction) {
final TypeSerializer<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
final CepOperator<IN, K, OUT> operator = new CepOperator<>(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy(),
processFunction,
lateDataOutputTag);
final SingleOutputStreamOperator<OUT> patternStream;
if (inputStream instanceof KeyedStream) {
KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
patternStream = keyedStream.transform(
"CepOperator",
outTypeInfo,
operator);
} else {
KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
patternStream = inputStream.keyBy(keySelector).transform(
"GlobalCepOperator",
outTypeInfo,
operator
).forceNonParallel();
}
return patternStream;
}
從上面可以看出具體的計算其實(shí)還是封裝進(jìn)了CepOperator 里面了
2. CepOperator
數(shù)據(jù)存儲對象:
private transient ValueState<NFAState> computationStates;
private transient MapState<Long, List<IN>> elementQueueState;
private transient SharedBuffer<IN> partialMatches;
對每個元素的處理情況:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
if (isProcessingTime) {
if (comparator == null) {
// there can be no out of order elements in processing time
NFAState nfaState = getNFAState();
long timestamp = getProcessingTimeService().getCurrentProcessingTime();
advanceTime(nfaState, timestamp);
processEvent(nfaState, element.getValue(), timestamp);
updateNFA(nfaState);
} else {
long currentTime = timerService.currentProcessingTime();
bufferEvent(element.getValue(), currentTime);
// register a timer for the next millisecond to sort and emit buffered data
timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentTime + 1);
}
} else {
long timestamp = element.getTimestamp();
IN value = element.getValue();
// In event-time processing we assume correctness of the watermark.
// Events with timestamp smaller than or equal with the last seen watermark are considered late.
// Late events are put in a dedicated side output, if the user has specified one.
if (timestamp > lastWatermark) {
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper watermark.
saveRegisterWatermarkTimer();
bufferEvent(value, timestamp);
} else if (lateDataOutputTag != null) {
output.collect(lateDataOutputTag, element);
}
}
}
從上面可以看出當(dāng)isProcessingTime && comparator == null 的時候, 會進(jìn)行數(shù)據(jù)的及時處理
// 找出超時的元素
private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
if (!timedOut.isEmpty()) {
processTimedOutSequences(timedOut);
}
}
}
// 處理每條數(shù)據(jù)
private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
Collection<Map<String, List<IN>>> patterns =
nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy, cepTimerService);
processMatchedSequences(patterns, timestamp);
}
}
其他的都是調(diào)用 bufferEvent 并同時注冊一個定時器, 來處理這些緩存起來的數(shù)據(jù)
bufferEvent 將數(shù)據(jù)都放入了elementQueueState
private void bufferEvent(IN event, long currentTime) throws Exception {
List<IN> elementsForTimestamp = elementQueueState.get(currentTime);
if (elementsForTimestamp == null) {
elementsForTimestamp = new ArrayList<>();
}
if (getExecutionConfig().isObjectReuseEnabled()) {
// copy the StreamRecord so that it cannot be changed
elementsForTimestamp.add(inputSerializer.copy(event));
} else {
elementsForTimestamp.add(event);
}
elementQueueState.put(currentTime, elementsForTimestamp);
}
又因?yàn)?code>CepOperator繼承了 Triggerable 并實(shí)現(xiàn)了 onEventTime 和 onProcessingTime, 所以上面的定時器觸發(fā)的時候就可以調(diào)用這2個實(shí)現(xiàn)來處理數(shù)據(jù)了
private PriorityQueue<Long> getSortedTimestamps() throws Exception {
PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>();
for (Long timestamp : elementQueueState.keys()) {
sortedTimestamps.offer(timestamp);
}
return sortedTimestamps;
}
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in event time order and custom comparator if exists
// by feeding them in the NFA
// 3) advance the time to the current watermark, so that expired patterns are discarded.
// 4) update the stored state for the key, by only storing the new NFA and MapState iff they
// have state to be used later.
// 5) update the last seen watermark.
// STEP 1
PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
NFAState nfaState = getNFAState();
// STEP 2
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
long timestamp = sortedTimestamps.poll();
advanceTime(nfaState, timestamp);
try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
elements.forEachOrdered(
event -> {
try {
processEvent(nfaState, event, timestamp);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
);
}
elementQueueState.remove(timestamp);
}
// STEP 3
advanceTime(nfaState, timerService.currentWatermark());
// STEP 4
updateNFA(nfaState);
if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) {
saveRegisterWatermarkTimer();
}
// STEP 5
updateLastSeenWatermark(timerService.currentWatermark());
}
@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in process time order and custom comparator if exists
// by feeding them in the NFA
// 3) update the stored state for the key, by only storing the new NFA and MapState iff they
// have state to be used later.
// STEP 1
PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
NFAState nfa = getNFAState();
// STEP 2
while (!sortedTimestamps.isEmpty()) {
long timestamp = sortedTimestamps.poll();
advanceTime(nfa, timestamp);
try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
elements.forEachOrdered(
event -> {
try {
processEvent(nfa, event, timestamp);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
);
}
elementQueueState.remove(timestamp);
}
// STEP 3
updateNFA(nfa);
}
3. NFA
從上面的代碼可以看出代碼的核心處理都放在了NFA里面了
NFA的具體論文參見 Efficient Pattern Matching over Event Streams
對于開發(fā)人員來說我們需要關(guān)注NFA的大概實(shí)現(xiàn)邏輯和解決的核心問題就可以了
上面調(diào)用的NFA方法有2個:
advanceTimeprocess
3.1 NFACompiler
NFA的初始化使用到了NFACompiler
final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
final CepOperator<IN, K, OUT> operator = new CepOperator<>(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy(),
processFunction,
lateDataOutputTag);
該類將pattern 進(jìn)行處理, 得到一個NFAFactory 并將其傳入了CepOperator, 而不是將pattern傳入了進(jìn)去
public static <T> NFAFactory<T> compileFactory(
final Pattern<T, ?> pattern,
boolean timeoutHandling) {
if (pattern == null) {
// return a factory for empty NFAs
return new NFAFactoryImpl<>(0, Collections.<State<T>>emptyList(), timeoutHandling);
} else {
final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
nfaFactoryCompiler.compileFactory();
return new NFAFactoryImpl<>(nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
}
}
在compileFactory函數(shù)里面會真正將pattern 和 states 關(guān)聯(lián)起來, 這里的states 也會在下面初始化NFA的時候使用到, 并且不再變化
void compileFactory() {
if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
throw new MalformedPatternException("NotFollowedBy is not supported as a last part of a Pattern!");
}
checkPatternNameUniqueness();
checkPatternSkipStrategy();
// we're traversing the pattern from the end to the beginning --> the first state is the final state
State<T> sinkState = createEndingState();
// add all the normal states
sinkState = createMiddleStates(sinkState);
// add the beginning state
createStartState(sinkState);
}
在CepOperator#open里面創(chuàng)建NFA
@Override
public NFA<T> createNFA() {
return new NFA<>(states, windowTime, timeoutHandling);
}
3.2 NFA#process
在NFA 中它自身的成員變量 states (即上文提到的) 是靜態(tài)的, 不變的, 而我們的代碼會隨著數(shù)據(jù)的不斷變化整個緩存的數(shù)據(jù)會處于不同的狀態(tài)這些狀態(tài)的變動都是由NFAState 來維護(hù)的
由于新來的數(shù)據(jù), 當(dāng)這個數(shù)據(jù)進(jìn)入狀態(tài)機(jī)的不同地方, 會產(chǎn)生不同的后續(xù)狀態(tài), 因此需要用當(dāng)前的數(shù)據(jù)來驅(qū)動當(dāng)前狀態(tài)機(jī)的所有狀態(tài), 此時真正的數(shù)據(jù)都在SharedBuffer里面, 并通過sharedBufferAccessor 來訪問/修改
以下是代碼的實(shí)現(xiàn)邏輯看起來很復(fù)雜, 具體的邏輯可以參看上面提到的論文,能有個大概的了解
private Collection<Map<String, List<T>>> doProcess(
final SharedBufferAccessor<T> sharedBufferAccessor,
final NFAState nfaState,
final EventWrapper event,
final AfterMatchSkipStrategy afterMatchSkipStrategy,
final TimerService timerService) throws Exception {
final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
final PriorityQueue<ComputationState> potentialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
// iterate over all current computations
// 一個新的event進(jìn)來, 需要遍歷整個`partialMatches`, 而每個match都會根據(jù)當(dāng)前的event
// 會計算newComputationStates
for (ComputationState computationState : nfaState.getPartialMatches()) {
final Collection<ComputationState> newComputationStates = computeNextStates(
sharedBufferAccessor,
computationState,
event,
timerService);
if (newComputationStates.size() != 1) {
nfaState.setStateChanged();
} else if (!newComputationStates.iterator().next().equals(computationState)) {
nfaState.setStateChanged();
}
//delay adding new computation states in case a stop state is reached and we discard the path.
final Collection<ComputationState> statesToRetain = new ArrayList<>();
//if stop state reached in this path
boolean shouldDiscardPath = false;
for (final ComputationState newComputationState : newComputationStates) {
if (isFinalState(newComputationState)) {
potentialMatches.add(newComputationState);
} else if (isStopState(newComputationState)) {
//reached stop state. release entry for the stop state
shouldDiscardPath = true;
sharedBufferAccessor.releaseNode(newComputationState.getPreviousBufferEntry());
} else {
// add new computation state; it will be processed once the next event arrives
statesToRetain.add(newComputationState);
}
}
if (shouldDiscardPath) {
// a stop state was reached in this branch. release branch which results in removing previous event from
// the buffer
for (final ComputationState state : statesToRetain) {
sharedBufferAccessor.releaseNode(state.getPreviousBufferEntry());
}
} else {
newPartialMatches.addAll(statesToRetain);
}
}
if (!potentialMatches.isEmpty()) {
nfaState.setStateChanged();
}
// 在這里會拿出真正匹配到的數(shù)據(jù)
List<Map<String, List<T>>> result = new ArrayList<>();
if (afterMatchSkipStrategy.isSkipStrategy()) {
processMatchesAccordingToSkipStrategy(sharedBufferAccessor,
nfaState,
afterMatchSkipStrategy,
potentialMatches,
newPartialMatches,
result);
} else {
for (ComputationState match : potentialMatches) {
Map<String, List<T>> materializedMatch =
sharedBufferAccessor.materializeMatch(
sharedBufferAccessor.extractPatterns(
match.getPreviousBufferEntry(),
match.getVersion()).get(0)
);
result.add(materializedMatch);
sharedBufferAccessor.releaseNode(match.getPreviousBufferEntry());
}
}
// 修改nfaState的狀態(tài)
nfaState.setNewPartialMatches(newPartialMatches);
return result;
}