acker概述
JStorm的acker機(jī)制,能夠保證消息至少被處理一次(at least once)。也就是說(shuō),能夠保證不丟消息。這里就詳細(xì)解析一下acker的實(shí)現(xiàn)原理。
消息流
假設(shè)我們有一個(gè)簡(jiǎn)單的topology,結(jié)構(gòu)為spout -> bolt。 spout emit了一條消息,發(fā)送至bolt。bolt作為最后一個(gè)處理者,沒(méi)有再向下游emit消息。

從上圖可以看到,所有的ack消息都會(huì)發(fā)送到acker,acker會(huì)根據(jù)算法計(jì)算從特定spout發(fā)射出來(lái)的tuple tree是否被完全處理。如果成功處理,則發(fā)送__acker_ack消息給spout,否則發(fā)送__acker_fail消息給spout。然后spout中可以做相應(yīng)的邏輯如重發(fā)消息等。
在JStorm中,acker是一種bolt,因此它的處理、消息發(fā)送跟正常的bolt是一樣的。只不過(guò),acker是JStorm框架創(chuàng)建的bolt,用戶不能自行創(chuàng)建。如果用戶在代碼中使用:
Config.setNumAckers(conf, 1);
就會(huì)自動(dòng)創(chuàng)建并行度為1的acker bolt;如果為0,則就沒(méi)有acker bolt了。
如何判斷消息是否被成功處理?
acker的算法非常巧妙,它利用了數(shù)學(xué)上的異或操作來(lái)實(shí)現(xiàn)對(duì)整個(gè)tuple tree的判斷。在一個(gè)topology中的一條消息形成的tuple tree中,所有的消息,都會(huì)有一個(gè)MessageId,它內(nèi)部其實(shí)就是一個(gè)map:
Map<Long, Long> _anchorsToIds;
存儲(chǔ)的是anchor和anchor value。而anchor其實(shí)就是root_id,它在spout中生成,并且一路透?jìng)鞯剿械腷olt中,屬于同一個(gè)tuple tree中的消息都會(huì)有相同的root_id,它可以唯一標(biāo)識(shí)spout發(fā)出來(lái)的這條消息(以及從下游bolt根據(jù)這個(gè)tuple衍生發(fā)出的消息)。
下面是一個(gè)tuple的ack流程:
- spout發(fā)送消息時(shí),先生成root_id。
- 對(duì)每一個(gè)目標(biāo)bolt task,生成
<root_id, random()>,即為這個(gè)root_id對(duì)應(yīng)一個(gè)隨機(jī)數(shù)值,然后隨著消息本身發(fā)送到下游bolt中。假設(shè)有2個(gè)bolt,生成的隨機(jī)數(shù)對(duì)分別為:<root_id, r1>,<root_id, r2>。 - spout向acker發(fā)送ack_init消息,它的MessageId =
<root_id, r1 ^ r2>(即所有task產(chǎn)生的隨機(jī)數(shù)列表的異或值)。 - bolt收到spout或上游bolt發(fā)送過(guò)來(lái)的tuple之后,首先它會(huì)向acker發(fā)送ack消息,MessageId即為收到的值。同時(shí),如果bolt下游還有bolt,則跟步驟2類似,會(huì)對(duì)每一個(gè)bolt,生成隨機(jī)數(shù)對(duì),root_id相同,但是值變?yōu)?code>當(dāng)前值 ^ 新生成的隨機(jī)數(shù)。以此類推。
- acker收到消息后,會(huì)對(duì)root_id下所有的值做異或操作,如果算出來(lái)的值為0,表示整個(gè)tuple tree被成功處理;否則就會(huì)一直等待,直到超時(shí),則tuple tree處理失敗。
- acker通知spout消息處理成功或失敗。
我們以一個(gè)稍微復(fù)雜一點(diǎn)的topology為例,描述一下它的整個(gè)過(guò)程。 假設(shè)我們的topology結(jié)構(gòu)為:spout -> bolt1/bolt2 -> bolt3,即spout同時(shí)向bolt1和bolt2發(fā)送消息,它們處理完后,都向bolt3發(fā)送消息。bolt3沒(méi)有后續(xù)處理節(jié)點(diǎn)。

1). spout發(fā)射一條消息,生成root_id,由于這個(gè)值不變,我們就用root_id來(lái)標(biāo)識(shí)。 spout -> bolt1的MessageId =<root_id, 1>spout -> bolt2的MessageId =<root_id, 2>spout -> acker的MessageId =<root_id, 1^2>
2). bolt1收到消息后,生成如下消息: bolt1 -> bolt3的MessageId =<root_id, 3>,bolt1 -> acker的MessageId =<root_id, 1^3>
3). 同樣,bolt2收到消息后,生成如下消息: bolt2 -> bolt3的MessageId =<root_id, 4>,bolt2 -> acker的MessageId =<root_id, 2^4>
4). bolt3收到消息后,生成如下消息: bolt3 -> acker的MessageId =<root_id, 3>,bolt3 -> acker的MessageId =<root_id, 4>
5). acker中總共收到以下消息:<root_id, 1^2>,<root_id, 1^3>,<root_id, 2^4>,<root_id, 3>,<root_id, 4>所有的值進(jìn)行異或之后,即為1^2^1^3^2^4^3^4= 0。
代碼分析
實(shí)現(xiàn)ack的代碼,主要在這幾個(gè)類中:SpoutCollector,BoltCollector,Acker。
其中SpoutCollector.sendSpoutMsg方法:
private List<Integer> sendSpoutMsg(String out_stream_id, List<Object> values, Object message_id, Integer out_task_id) {
final long startTime = System.nanoTime();
try {
// 得到目標(biāo)task id列表
java.util.List<Integer> out_tasks;
if (out_task_id != null) {
out_tasks = sendTargets.get(out_task_id, out_stream_id, values);
} else {
out_tasks = sendTargets.get(out_stream_id, values);
}
if (out_tasks.size() == 0) {
// don't need send tuple to other task
return out_tasks;
}
List<Long> ackSeq = new ArrayList<Long>();
Boolean needAck = (message_id != null) && (ackerNum > 0);
// 生成隨機(jī)的root_id,但是需要確保在當(dāng)前spout中不能有重復(fù)的,不然就不能保證ack的準(zhǔn)確性了
Long root_id = MessageId.generateId(random);
if (needAck) {
while (pending.containsKey(root_id)) {
root_id = MessageId.generateId(random);
}
}
// 遍歷所有的目標(biāo)task,每個(gè)task的messageId=<root_id, 隨機(jī)數(shù)值>
for (Integer t : out_tasks) {
MessageId msgid;
if (needAck) {
Long as = MessageId.generateId(random);
msgid = MessageId.makeRootId(root_id, as);
// 添加到ackSeq list中,后面會(huì)有用
ackSeq.add(as);
} else {
msgid = MessageId.makeUnanchored();
}
// 扔到transfer queue中,即進(jìn)入發(fā)送隊(duì)列
TupleImplExt tp = new TupleImplExt(topology_context, values, task_id, out_stream_id, msgid);
tp.setTargetTaskId(t);
transfer_fn.transfer(tp);
}
// ack消息的邏輯在這里面,上面對(duì)所有的目標(biāo)task分別emit消息,但是ack_init消息只需要發(fā)送一條。
if (needAck) {
TupleInfo info = new TupleInfo();
info.setStream(out_stream_id);
info.setValues(values);
info.setMessageId(message_id);
info.setTimestamp(System.nanoTime());
pending.putHead(root_id, info);
// messageId = <root_id, 所有目標(biāo)task的messageId隨機(jī)數(shù)值的異或>
List<Object> ackerTuple = JStormUtils.mk_list((Object) root_id, JStormUtils.bit_xor_vals(ackSeq), task_id);
// 發(fā)送給acker。會(huì)根據(jù)__acker_init這個(gè)stream直接找到task id進(jìn)行發(fā)送。
UnanchoredSend.send(topology_context, sendTargets, transfer_fn, Acker.ACKER_INIT_STREAM_ID, ackerTuple);
} else if (message_id != null) {
// 這里的邏輯,處理沒(méi)有acker,但是仍然實(shí)現(xiàn)了IAckValueSpout接口的情況,需要給這種spout回調(diào)ack方法的機(jī)會(huì)。
TupleInfo info = new TupleInfo();
info.setStream(out_stream_id);
info.setValues(values);
info.setMessageId(message_id);
info.setTimestamp(0);
AckSpoutMsg ack = new AckSpoutMsg(spout, null, info, task_stats, isDebug);
ack.run();
}
return out_tasks;
} finally {
long endTime = System.nanoTime();
emitTotalTimer.update((endTime - startTime) / TimeUtils.NS_PER_US);
}
}
再來(lái)看一下BoltCollector類的邏輯,通常來(lái)說(shuō)bolt是先execute(先emit),再執(zhí)行ack方法。因此先看boltEmit方法:
private List<Integer> boltEmit(String out_stream_id, Collection<Tuple> anchors, List<Object> values, Integer out_task_id) {
final long start = System.nanoTime();
try {
// 一樣地獲取所有目標(biāo)task列表
java.util.List<Integer> out_tasks;
if (out_task_id != null) {
out_tasks = sendTargets.get(out_task_id, out_stream_id, values);
} else {
out_tasks = sendTargets.get(out_stream_id, values);
}
// 遍歷所有目標(biāo)task,每一個(gè)目標(biāo)task的message id= <root_id, edge_id>,其中edge_id是在這個(gè)bolt里新生成的隨機(jī)數(shù)
for (Integer t : out_tasks) {
Map<Long, Long> anchors_to_ids = new HashMap<Long, Long>();
if (anchors != null) {
// 在一般的情況下anchors的size=1,見(jiàn)BasicOutputCollector類,即為當(dāng)前收到的inputTuple。
for (Tuple a : anchors) {
Long edge_id = MessageId.generateId(random);
long now = System.currentTimeMillis();
// 這里是提前刪除可能的超時(shí)tuple
if (now - lastRotate > rotateTime) {
pending_acks.rotate();
lastRotate = now;
}
// 這里會(huì)將<inputTuple, edge_id>放入pending_acks
put_xor(pending_acks, a, edge_id);
// 這里將每一對(duì)<root_id, edge_id>放入anchors_to_ids(一般情況下也只有一對(duì)),由于anchors_to_ids是一個(gè)空map,因此put_xor里面,相當(dāng)于拿root_id對(duì)應(yīng)的值^0 = root_id的值
for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) {
put_xor(anchors_to_ids, root_id, edge_id);
}
}
}
// 往目標(biāo)bolt發(fā)送消息
MessageId msgid = MessageId.makeId(anchors_to_ids);
TupleImplExt tupleExt = new TupleImplExt(topologyContext, values, task_id, out_stream_id, msgid);
tupleExt.setTargetTaskId(t);
taskTransfer.transfer(tupleExt);
}
return out_tasks;
} catch (Exception e) {
LOG.error("bolt emit", e);
} finally {
long end = System.nanoTime();
timer.update((end - start) / TimeUtils.NS_PER_US);
}
return new ArrayList<Integer>();
}
emit完之后,再來(lái)看ack的邏輯:
public void ack(Tuple input) {
if (ackerNum > 0) {
Long ack_val = 0L;
// 這里取出boltEmit放入的對(duì)象:<inputTuple, edge_id>
Object pend_val = pending_acks.remove(input);
if (pend_val != null) {
// ack_val = edge_id
ack_val = (Long) (pend_val);
}
// 發(fā)送ack消息,messageId = <root_id, inputTuple的隨機(jī)數(shù) ^ edge_id>
for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) {
UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Acker.ACKER_ACK_STREAM_ID,
JStormUtils.mk_list((Object) e.getKey(), JStormUtils.bit_xor(e.getValue(), ack_val)));
}
}
Long startTime = (Long) tuple_start_times.remove(input);
if (startTime != null) {
Long endTime = System.nanoTime();
long latency = (endTime - startTime)/TimeUtils.NS_PER_US;
long lifeCycle = (System.currentTimeMillis() - ((TupleExt) input).getCreationTimeStamp()) * TimeUtils.NS_PER_US;
task_stats.bolt_acked_tuple(input.getSourceComponent(), input.getSourceStreamId(), latency, lifeCycle);
}
}
最后就是acker了,這個(gè)邏輯比較簡(jiǎn)單:
public void execute(Tuple input) {
Object id = input.getValue(0);
AckObject curr = pending.get(id);
String stream_id = input.getSourceStreamId();
// __acker_init消息,由spout發(fā)送,直接放入pending map中
if (Acker.ACKER_INIT_STREAM_ID.equals(stream_id)) {
if (curr == null) {
curr = new AckObject();
curr.val = input.getLong(1);
curr.spout_task = input.getInteger(2);
pending.put(id, curr);
} else {
// bolt's ack first come
curr.update_ack(input.getValue(1));
curr.spout_task = input.getInteger(2);
}
} else if (Acker.ACKER_ACK_STREAM_ID.equals(stream_id)) {
// __ack_ack消息
if (curr != null) {
curr.update_ack(input.getValue(1));
} else {
// two case
// one is timeout
// the other is bolt's ack first come
curr = new AckObject();
curr.val = input.getLong(1);
pending.put(id, curr);
}
} else if (Acker.ACKER_FAIL_STREAM_ID.equals(stream_id)) {
// 也有可能直接fail了
if (curr == null) {
// do nothing
// already timeout, should go fail
return;
}
curr.failed = true;
} else {
LOG.info("Unknow source stream");
return;
}
// 告訴spout這個(gè)消息ack/fail了
Integer task = curr.spout_task;
if (task != null) {
if (curr.val == 0) {
pending.remove(id);
List values = JStormUtils.mk_list(id);
collector.emitDirect(task, Acker.ACKER_ACK_STREAM_ID, values);
} else {
if (curr.failed) {
pending.remove(id);
List values = JStormUtils.mk_list(id);
collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID, values);
}
}
} else {
}
// 這里只是更新metrics
// add this operation to update acker's ACK statics
collector.ack(input);
long now = System.currentTimeMillis();
if (now - lastRotate > rotateTime) {
lastRotate = now;
Map<Object, AckObject> tmp = pending.rotate();
LOG.info("Acker's timeout item size:{}", tmp.size());
}
}
如何使用acker
- 設(shè)置acker的并發(fā)度要>0;
- spout發(fā)送消息時(shí),使用的接口
List <integer>emit(List <object>tuple, Object messageId)其中messageId由用戶指定生成,用戶消息處理成功或者失敗后,用于對(duì)public void ack(Object messageId)和public void fail(Object messageId)接口的回調(diào); - 如果spout同時(shí)從
IAckValueSpout和IFailValueSpout派生,那么要求實(shí)現(xiàn)void fail(Object messageId, List <object>values)和void ack(Object messageId, List <object>values);這兩接口除了會(huì)返回messageId,還會(huì)返回每一條消息; - bolt一般從如果從IRichBolt派生,發(fā)送消息到下游時(shí)要注意以下兩種不同類型的接口:
public List<Integer> emit(Tuple anchor, List<Object> tuple);
//anchor 代表當(dāng)前bolt接收到的消息, tuple代表發(fā)送到下游的消息
public List<Integer> emit(List<Object> tuple);
//如果對(duì)即將發(fā)送的消息不打算acker的話,可以直接用第二種接口;
如果需要對(duì)即將發(fā)送的下游的消息要進(jìn)行acker的話,emit的時(shí)候需要攜帶anchor`
- 如果bolt接收到的消息是需要被acker的話,記得在execute里頭別忘了執(zhí)行_collector.ack(tuple)操作;例子如下
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0)));
_collector.ack(tuple);
}
對(duì)于從IRichBolt派生的的bolt來(lái)說(shuō)是不是很麻煩,即要求采樣合適的emit接口,還要求主動(dòng)執(zhí)行acker操作,那么好消息來(lái)了如果當(dāng)前bolt是從IBasicBolt派生的話,內(nèi)部都會(huì)幫你執(zhí)行這些操作,你只管調(diào)用
emit(List <object>tuple)發(fā)送消息即可;例子如下
public class PairCount implements IBasicBolt {
private static final long serialVersionUID = 7346295981904929419L;
public static final Logger LOG =LoggerFactory.getLogger(PairCount.class);
private AtomicLong sum = new AtomicLong(0);
private TpsCounter tpsCounter;
public void prepare(Map conf, TopologyContext context) {
tpsCounter = new TpsCounter(context.getThisComponentId() +
":" + context.getThisTaskId());
LOG.info("Successfully do parepare " +context.getThisComponentId());
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
tpsCounter.count();
Long tupleId = tuple.getLong(0);
Pair pair = (Pair)tuple.getValue(1);
sum.addAndGet(pair.getValue());
// 如果需要ack,只需要這么做:
collector.emit(new Values(tupleId, pair));
}
public void cleanup() {
tpsCounter.cleanup();
LOG.info("Total receive value :" + sum);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ID", "PAIR"));
}
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}