Storm/JStorm Acker機(jī)制


acker概述

JStorm的acker機(jī)制,能夠保證消息至少被處理一次(at least once)。也就是說(shuō),能夠保證不丟消息。這里就詳細(xì)解析一下acker的實(shí)現(xiàn)原理。

消息流

假設(shè)我們有一個(gè)簡(jiǎn)單的topology,結(jié)構(gòu)為spout -> bolt。 spout emit了一條消息,發(fā)送至bolt。bolt作為最后一個(gè)處理者,沒(méi)有再向下游emit消息。

image.png

從上圖可以看到,所有的ack消息都會(huì)發(fā)送到acker,acker會(huì)根據(jù)算法計(jì)算從特定spout發(fā)射出來(lái)的tuple tree是否被完全處理。如果成功處理,則發(fā)送__acker_ack消息給spout,否則發(fā)送__acker_fail消息給spout。然后spout中可以做相應(yīng)的邏輯如重發(fā)消息等。

在JStorm中,acker是一種bolt,因此它的處理、消息發(fā)送跟正常的bolt是一樣的。只不過(guò),acker是JStorm框架創(chuàng)建的bolt,用戶不能自行創(chuàng)建。如果用戶在代碼中使用:

Config.setNumAckers(conf, 1);

就會(huì)自動(dòng)創(chuàng)建并行度為1的acker bolt;如果為0,則就沒(méi)有acker bolt了。

如何判斷消息是否被成功處理?

acker的算法非常巧妙,它利用了數(shù)學(xué)上的異或操作來(lái)實(shí)現(xiàn)對(duì)整個(gè)tuple tree的判斷。在一個(gè)topology中的一條消息形成的tuple tree中,所有的消息,都會(huì)有一個(gè)MessageId,它內(nèi)部其實(shí)就是一個(gè)map:

Map<Long, Long> _anchorsToIds;

存儲(chǔ)的是anchor和anchor value。而anchor其實(shí)就是root_id,它在spout中生成,并且一路透?jìng)鞯剿械腷olt中,屬于同一個(gè)tuple tree中的消息都會(huì)有相同的root_id,它可以唯一標(biāo)識(shí)spout發(fā)出來(lái)的這條消息(以及從下游bolt根據(jù)這個(gè)tuple衍生發(fā)出的消息)。
下面是一個(gè)tuple的ack流程:

  1. spout發(fā)送消息時(shí),先生成root_id。
  2. 對(duì)每一個(gè)目標(biāo)bolt task,生成<root_id, random()>,即為這個(gè)root_id對(duì)應(yīng)一個(gè)隨機(jī)數(shù)值,然后隨著消息本身發(fā)送到下游bolt中。假設(shè)有2個(gè)bolt,生成的隨機(jī)數(shù)對(duì)分別為:<root_id, r1>, <root_id, r2>。
  3. spout向acker發(fā)送ack_init消息,它的MessageId =<root_id, r1 ^ r2>(即所有task產(chǎn)生的隨機(jī)數(shù)列表的異或值)。
  4. bolt收到spout或上游bolt發(fā)送過(guò)來(lái)的tuple之后,首先它會(huì)向acker發(fā)送ack消息,MessageId即為收到的值。同時(shí),如果bolt下游還有bolt,則跟步驟2類似,會(huì)對(duì)每一個(gè)bolt,生成隨機(jī)數(shù)對(duì),root_id相同,但是值變?yōu)?code>當(dāng)前值 ^ 新生成的隨機(jī)數(shù)。以此類推。
  5. acker收到消息后,會(huì)對(duì)root_id下所有的值做異或操作,如果算出來(lái)的值為0,表示整個(gè)tuple tree被成功處理;否則就會(huì)一直等待,直到超時(shí),則tuple tree處理失敗。
  6. acker通知spout消息處理成功或失敗。

我們以一個(gè)稍微復(fù)雜一點(diǎn)的topology為例,描述一下它的整個(gè)過(guò)程。 假設(shè)我們的topology結(jié)構(gòu)為:spout -> bolt1/bolt2 -> bolt3,即spout同時(shí)向bolt1和bolt2發(fā)送消息,它們處理完后,都向bolt3發(fā)送消息。bolt3沒(méi)有后續(xù)處理節(jié)點(diǎn)。

image.png

1). spout發(fā)射一條消息,生成root_id,由于這個(gè)值不變,我們就用root_id來(lái)標(biāo)識(shí)。 spout -> bolt1的MessageId =<root_id, 1>spout -> bolt2的MessageId =<root_id, 2>spout -> acker的MessageId =<root_id, 1^2>

2). bolt1收到消息后,生成如下消息: bolt1 -> bolt3的MessageId =<root_id, 3>,bolt1 -> acker的MessageId =<root_id, 1^3>

3). 同樣,bolt2收到消息后,生成如下消息: bolt2 -> bolt3的MessageId =<root_id, 4>,bolt2 -> acker的MessageId =<root_id, 2^4>

4). bolt3收到消息后,生成如下消息: bolt3 -> acker的MessageId =<root_id, 3>,bolt3 -> acker的MessageId =<root_id, 4>

5). acker中總共收到以下消息:<root_id, 1^2><root_id, 1^3>,<root_id, 2^4>,<root_id, 3>,<root_id, 4>所有的值進(jìn)行異或之后,即為1^2^1^3^2^4^3^4= 0。


代碼分析

實(shí)現(xiàn)ack的代碼,主要在這幾個(gè)類中:SpoutCollector,BoltCollector,Acker。
其中SpoutCollector.sendSpoutMsg方法

    private List<Integer> sendSpoutMsg(String out_stream_id, List<Object> values, Object message_id, Integer out_task_id) {
        final long startTime = System.nanoTime();
        try {
            // 得到目標(biāo)task id列表
            java.util.List<Integer> out_tasks;
            if (out_task_id != null) {
                out_tasks = sendTargets.get(out_task_id, out_stream_id, values);
            } else {
                out_tasks = sendTargets.get(out_stream_id, values);
            }

            if (out_tasks.size() == 0) {
                // don't need send tuple to other task
                return out_tasks;
            }
            List<Long> ackSeq = new ArrayList<Long>();
            Boolean needAck = (message_id != null) && (ackerNum > 0);

            // 生成隨機(jī)的root_id,但是需要確保在當(dāng)前spout中不能有重復(fù)的,不然就不能保證ack的準(zhǔn)確性了
            Long root_id = MessageId.generateId(random);
            if (needAck) {
                while (pending.containsKey(root_id)) {
                    root_id = MessageId.generateId(random);
                }
            }

            // 遍歷所有的目標(biāo)task,每個(gè)task的messageId=<root_id, 隨機(jī)數(shù)值>
            for (Integer t : out_tasks) {
                MessageId msgid;
                if (needAck) {
                    Long as = MessageId.generateId(random);
                    msgid = MessageId.makeRootId(root_id, as);
                    // 添加到ackSeq list中,后面會(huì)有用
                    ackSeq.add(as);
                } else {
                    msgid = MessageId.makeUnanchored();
                }

                // 扔到transfer queue中,即進(jìn)入發(fā)送隊(duì)列
                TupleImplExt tp = new TupleImplExt(topology_context, values, task_id, out_stream_id, msgid);
                tp.setTargetTaskId(t);
                transfer_fn.transfer(tp);
            }

            // ack消息的邏輯在這里面,上面對(duì)所有的目標(biāo)task分別emit消息,但是ack_init消息只需要發(fā)送一條。
            if (needAck) {
                TupleInfo info = new TupleInfo();
                info.setStream(out_stream_id);
                info.setValues(values);
                info.setMessageId(message_id);
                info.setTimestamp(System.nanoTime());

                pending.putHead(root_id, info);

                // messageId = <root_id, 所有目標(biāo)task的messageId隨機(jī)數(shù)值的異或>
                List<Object> ackerTuple = JStormUtils.mk_list((Object) root_id, JStormUtils.bit_xor_vals(ackSeq), task_id);

                // 發(fā)送給acker。會(huì)根據(jù)__acker_init這個(gè)stream直接找到task id進(jìn)行發(fā)送。
                UnanchoredSend.send(topology_context, sendTargets, transfer_fn, Acker.ACKER_INIT_STREAM_ID, ackerTuple);

            } else if (message_id != null) {
                // 這里的邏輯,處理沒(méi)有acker,但是仍然實(shí)現(xiàn)了IAckValueSpout接口的情況,需要給這種spout回調(diào)ack方法的機(jī)會(huì)。
                TupleInfo info = new TupleInfo();
                info.setStream(out_stream_id);
                info.setValues(values);
                info.setMessageId(message_id);
                info.setTimestamp(0);

                AckSpoutMsg ack = new AckSpoutMsg(spout, null, info, task_stats, isDebug);
                ack.run();
            }

            return out_tasks;
        } finally {
            long endTime = System.nanoTime();
            emitTotalTimer.update((endTime - startTime) / TimeUtils.NS_PER_US);
        }
    }

再來(lái)看一下BoltCollector類的邏輯,通常來(lái)說(shuō)bolt是先execute(先emit),再執(zhí)行ack方法。因此先看boltEmit方法:

    private List<Integer> boltEmit(String out_stream_id, Collection<Tuple> anchors, List<Object> values, Integer out_task_id) {
        final long start = System.nanoTime();
        try {
            // 一樣地獲取所有目標(biāo)task列表
            java.util.List<Integer> out_tasks;
            if (out_task_id != null) {
                out_tasks = sendTargets.get(out_task_id, out_stream_id, values);
            } else {
                out_tasks = sendTargets.get(out_stream_id, values);
            }

            // 遍歷所有目標(biāo)task,每一個(gè)目標(biāo)task的message id= <root_id, edge_id>,其中edge_id是在這個(gè)bolt里新生成的隨機(jī)數(shù)
            for (Integer t : out_tasks) {
                Map<Long, Long> anchors_to_ids = new HashMap<Long, Long>();
                if (anchors != null) {
                    // 在一般的情況下anchors的size=1,見(jiàn)BasicOutputCollector類,即為當(dāng)前收到的inputTuple。
                    for (Tuple a : anchors) {
                        Long edge_id = MessageId.generateId(random);
                        long now = System.currentTimeMillis();
                        // 這里是提前刪除可能的超時(shí)tuple
                        if (now - lastRotate > rotateTime) {
                            pending_acks.rotate();
                            lastRotate = now;
                        }
                        // 這里會(huì)將<inputTuple, edge_id>放入pending_acks
                        put_xor(pending_acks, a, edge_id);
                        // 這里將每一對(duì)<root_id, edge_id>放入anchors_to_ids(一般情況下也只有一對(duì)),由于anchors_to_ids是一個(gè)空map,因此put_xor里面,相當(dāng)于拿root_id對(duì)應(yīng)的值^0 = root_id的值
                        for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) {
                            put_xor(anchors_to_ids, root_id, edge_id);
                        }
                    }
                }

                // 往目標(biāo)bolt發(fā)送消息
                MessageId msgid = MessageId.makeId(anchors_to_ids);
                TupleImplExt tupleExt = new TupleImplExt(topologyContext, values, task_id, out_stream_id, msgid);
                tupleExt.setTargetTaskId(t);
                taskTransfer.transfer(tupleExt);
            }
            return out_tasks;
        } catch (Exception e) {
            LOG.error("bolt emit", e);
        } finally {
            long end = System.nanoTime();
            timer.update((end - start) / TimeUtils.NS_PER_US);
        }
        return new ArrayList<Integer>();
    }

emit完之后,再來(lái)看ack的邏輯:

    public void ack(Tuple input) {
        if (ackerNum > 0) {
            Long ack_val = 0L;
            // 這里取出boltEmit放入的對(duì)象:<inputTuple, edge_id>
            Object pend_val = pending_acks.remove(input);
            if (pend_val != null) {
                // ack_val = edge_id
                ack_val = (Long) (pend_val);
            }

            // 發(fā)送ack消息,messageId = <root_id, inputTuple的隨機(jī)數(shù) ^ edge_id>
            for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) {
                UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Acker.ACKER_ACK_STREAM_ID,
                        JStormUtils.mk_list((Object) e.getKey(), JStormUtils.bit_xor(e.getValue(), ack_val)));
            }
        }

        Long startTime = (Long) tuple_start_times.remove(input);
        if (startTime != null) {
            Long endTime = System.nanoTime();
            long latency = (endTime - startTime)/TimeUtils.NS_PER_US;
            long lifeCycle = (System.currentTimeMillis() - ((TupleExt) input).getCreationTimeStamp()) * TimeUtils.NS_PER_US;

            task_stats.bolt_acked_tuple(input.getSourceComponent(), input.getSourceStreamId(), latency, lifeCycle);
        }
    }

最后就是acker了,這個(gè)邏輯比較簡(jiǎn)單:

    public void execute(Tuple input) {
        Object id = input.getValue(0);
        AckObject curr = pending.get(id);
        String stream_id = input.getSourceStreamId();
        // __acker_init消息,由spout發(fā)送,直接放入pending map中
        if (Acker.ACKER_INIT_STREAM_ID.equals(stream_id)) {
            if (curr == null) {
                curr = new AckObject();

                curr.val = input.getLong(1);
                curr.spout_task = input.getInteger(2);

                pending.put(id, curr);
            } else {
                // bolt's ack first come
                curr.update_ack(input.getValue(1));
                curr.spout_task = input.getInteger(2);
            }

        } else if (Acker.ACKER_ACK_STREAM_ID.equals(stream_id)) {
            // __ack_ack消息
            if (curr != null) {
                curr.update_ack(input.getValue(1));
            } else {
                // two case
                // one is timeout
                // the other is bolt's ack first come
                curr = new AckObject();
                curr.val = input.getLong(1);
                pending.put(id, curr);
            }
        } else if (Acker.ACKER_FAIL_STREAM_ID.equals(stream_id)) {
            // 也有可能直接fail了
            if (curr == null) {
                // do nothing
                // already timeout, should go fail
                return;
            }
            curr.failed = true;
        } else {
            LOG.info("Unknow source stream");
            return;
        }

        // 告訴spout這個(gè)消息ack/fail了
        Integer task = curr.spout_task;
        if (task != null) {
            if (curr.val == 0) {
                pending.remove(id);
                List values = JStormUtils.mk_list(id);
                collector.emitDirect(task, Acker.ACKER_ACK_STREAM_ID, values);
            } else {
                if (curr.failed) {
                    pending.remove(id);
                    List values = JStormUtils.mk_list(id);
                    collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID, values);
                }
            }
        } else {

        }

        // 這里只是更新metrics
        // add this operation to update acker's ACK statics
        collector.ack(input);

        long now = System.currentTimeMillis();
        if (now - lastRotate > rotateTime) {
            lastRotate = now;
            Map<Object, AckObject> tmp = pending.rotate();
            LOG.info("Acker's timeout item size:{}", tmp.size());
        }
    }

如何使用acker

  1. 設(shè)置acker的并發(fā)度要>0;
  2. spout發(fā)送消息時(shí),使用的接口List <integer>emit(List <object>tuple, Object messageId)其中messageId由用戶指定生成,用戶消息處理成功或者失敗后,用于對(duì)public void ack(Object messageId)public void fail(Object messageId) 接口的回調(diào);
  3. 如果spout同時(shí)從IAckValueSpoutIFailValueSpout派生,那么要求實(shí)現(xiàn)void fail(Object messageId, List <object>values)void ack(Object messageId, List <object>values);這兩接口除了會(huì)返回messageId,還會(huì)返回每一條消息;
  4. bolt一般從如果從IRichBolt派生,發(fā)送消息到下游時(shí)要注意以下兩種不同類型的接口:
public List<Integer> emit(Tuple anchor, List<Object> tuple); 
//anchor 代表當(dāng)前bolt接收到的消息, tuple代表發(fā)送到下游的消息 
public List<Integer> emit(List<Object> tuple); 
//如果對(duì)即將發(fā)送的消息不打算acker的話,可以直接用第二種接口;
如果需要對(duì)即將發(fā)送的下游的消息要進(jìn)行acker的話,emit的時(shí)候需要攜帶anchor`
  1. 如果bolt接收到的消息是需要被acker的話,記得在execute里頭別忘了執(zhí)行_collector.ack(tuple)操作;例子如下
@Override 
public void execute(Tuple tuple) { 
       _collector.emit(tuple, new Values(tuple.getString(0)));           
       _collector.ack(tuple); 
}
  1. 對(duì)于從IRichBolt派生的的bolt來(lái)說(shuō)是不是很麻煩,即要求采樣合適的emit接口,還要求主動(dòng)執(zhí)行acker操作,那么好消息來(lái)了如果當(dāng)前bolt是從IBasicBolt派生的話,內(nèi)部都會(huì)幫你執(zhí)行這些操作,你只管調(diào)用emit(List <object>tuple)發(fā)送消息即可;

  2. 例子如下

public class PairCount implements IBasicBolt {
    private static final long serialVersionUID = 7346295981904929419L;
    public static final Logger LOG =LoggerFactory.getLogger(PairCount.class);
    private AtomicLong  sum = new AtomicLong(0);
    private TpsCounter tpsCounter;

    public void prepare(Map conf, TopologyContext context) {
        tpsCounter = new TpsCounter(context.getThisComponentId() + 
                ":" + context.getThisTaskId());
        LOG.info("Successfully do parepare " +context.getThisComponentId());
    }

    public void execute(Tuple tuple, BasicOutputCollector collector) {
        tpsCounter.count();

        Long tupleId = tuple.getLong(0);
        Pair pair = (Pair)tuple.getValue(1);

        sum.addAndGet(pair.getValue());

        // 如果需要ack,只需要這么做:
        collector.emit(new Values(tupleId, pair)); 
    }

    public void cleanup() {
        tpsCounter.cleanup();
        LOG.info("Total receive value :" + sum);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("ID", "PAIR"));
    }

    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容