Flume事務(wù)詳解

本文基于AvroSource,MemoryChannel,HDFSSink三個(gè)組件,對Flume數(shù)據(jù)傳輸?shù)氖聞?wù)進(jìn)行分析,如果使用的是其他組件,F(xiàn)lume事務(wù)具體的處理方式將會(huì)不同。一般情況下,用MemoryChannel就好了,我們公司用的就是這個(gè),F(xiàn)ileChannel速度慢,雖然提供日志級別的數(shù)據(jù)恢復(fù),但是一般情況下,不斷電MemoryChannel是不會(huì)丟數(shù)據(jù)的。

Flume提供事物操作,保證用戶的數(shù)據(jù)的可靠性,主要體現(xiàn)在:

  • 數(shù)據(jù)在傳輸?shù)较聜€(gè)節(jié)點(diǎn)時(shí)(通常是批量數(shù)據(jù)),如果接收節(jié)點(diǎn)出現(xiàn)異常,比如網(wǎng)絡(luò)異常,則回滾這一批數(shù)據(jù)。因此有可能導(dǎo)致數(shù)據(jù)重發(fā)
  • 同個(gè)節(jié)點(diǎn)內(nèi),Source寫入數(shù)據(jù)到Channel,數(shù)據(jù)在一個(gè)批次內(nèi)的數(shù)據(jù)出現(xiàn)異常,則不寫入到Channel。已接收到的部分?jǐn)?shù)據(jù)直接拋棄,靠上一個(gè)節(jié)點(diǎn)重發(fā)數(shù)據(jù)。


編程模型

一個(gè)事務(wù)的實(shí)現(xiàn)是在一個(gè)channel的實(shí)現(xiàn)中實(shí)現(xiàn)的,每一個(gè)連接到ChannelSourceSink必須獲得一個(gè)事務(wù)對象。Source使用ChannelProcessor管理事務(wù),Sink的事務(wù)管理則通過他們配置的Channel進(jìn)行管理。Flume在對Channel進(jìn)行Put和Take操作的時(shí)候,必須要用事物包住,比如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
  // This try clause includes whatever Channel operations you want to do

  Event eventToStage = EventBuilder.withBody("Hello Flume!",
                       Charset.forName("UTF-8"));
  ch.put(eventToStage);
  // Event takenEvent = ch.take();
  // ...
  txn.commit();
} catch (Throwable t) {
  txn.rollback();

  // Log exception, handle individual exceptions as needed

  // re-throw all Errors
  if (t instanceof Error) {
    throw (Error)t;
  }
} finally {
  txn.close();
}

Put事務(wù)流程

Put事務(wù)可以分為以下階段:

  • doPut:將批數(shù)據(jù)先寫入臨時(shí)緩沖區(qū)putList
  • doCommit:檢查channel內(nèi)存隊(duì)列是否足夠合并。
  • doRollback:channel內(nèi)存隊(duì)列空間不足,拋棄數(shù)據(jù),回滾event

我們從Source數(shù)據(jù)接收到寫入Channel這個(gè)過程對Put事務(wù)進(jìn)行分析。


AvroSource會(huì)spawn多個(gè)Worker線程(ThriftSourceHandler)去處理數(shù)據(jù),Worker處理數(shù)據(jù)的接口,我們只看batch批量處理這個(gè)接口:

public Status appendBatch(List<AvroFlumeEvent> events) {
    logger.debug("Avro source {}: Received avro event batch of {} events.",
        getName(), events.size());
    sourceCounter.incrementAppendBatchReceivedCount();
    sourceCounter.addToEventReceivedCount(events.size());

    List<Event> batch = new ArrayList<Event>();

    for (AvroFlumeEvent avroEvent : events) {
      Event event = EventBuilder.withBody(avroEvent.getBody().array(),
          toStringMap(avroEvent.getHeaders()));

      batch.add(event);
    }

    try {
      //ChannelProcessor,在Source初始化的時(shí)候傳進(jìn)來.將數(shù)據(jù)寫入對應(yīng)的Channel
      getChannelProcessor().processEventBatch(batch);
    } catch (Throwable t) {
      logger.error("Avro source " + getName() + ": Unable to process event " +
          "batch. Exception follows.", t);
      sourceCounter.incrementChannelWriteFail();
      if (t instanceof Error) {
        throw (Error) t;
      }
      return Status.FAILED;
    }

    sourceCounter.incrementAppendBatchAcceptedCount();
    sourceCounter.addToEventAcceptedCount(events.size());

    return Status.OK;
  }

事務(wù)邏輯都在processEventBatch這個(gè)方法里:

public void processEventBatch(List<Event> events) {
    Preconditions.checkNotNull(events, "Event list must not be null");
    //預(yù)處理每行數(shù)據(jù)
    events = interceptorChain.intercept(events);

    Map<Channel, List<Event>> reqChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    Map<Channel, List<Event>> optChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    //分類數(shù)據(jù),劃分不同的channel集合對應(yīng)的數(shù)據(jù)
    for (Event event : events) {
      List<Channel> reqChannels = selector.getRequiredChannels(event);

      for (Channel ch : reqChannels) {
        List<Event> eventQueue = reqChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          reqChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }

      List<Channel> optChannels = selector.getOptionalChannels(event);

      for (Channel ch : optChannels) {
        List<Event> eventQueue = optChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          optChannelQueue.put(ch, eventQueue);
        }

        eventQueue.add(event);
      }
    }

    // Process required channels
    for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        //事務(wù)開始,tx即MemoryTransaction類實(shí)例
        tx.begin();

        List<Event> batch = reqChannelQueue.get(reqChannel);

        for (Event event : batch) {
          // 這個(gè)put操作實(shí)際調(diào)用的是transaction.doPut
          reqChannel.put(event);
        }
        //提交,將數(shù)據(jù)寫入Channel的隊(duì)列中
        tx.commit();
      } catch (Throwable t) {
        //回滾
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " + reqChannel, t);
          throw (Error) t;
        } else if (t instanceof ChannelException) {
          throw (ChannelException) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }

    // Process optional channels
    for (Channel optChannel : optChannelQueue.keySet()) {
      Transaction tx = optChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = optChannelQueue.get(optChannel);

        for (Event event : batch) {
          optChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }

每個(gè)Worker線程都擁有一個(gè)Transaction實(shí)例,保存在Channel(BasicChannelSemantics)里的ThreadLocal<BasicTransactionSemantics>變量currentTransaction中。
那么,事務(wù)到底做了什么?


實(shí)際上,Transaction實(shí)例包含兩個(gè)雙向阻塞隊(duì)列LinkedBlockingDeque(感覺沒必要用雙向隊(duì)列,每個(gè)線程寫自己的putList,又不是多個(gè)線程?),分別為:

  • putList
  • takeList
    對于Put事物操作,當(dāng)然是只用到putList了。putList就是一個(gè)臨時(shí)的緩沖區(qū),數(shù)據(jù)會(huì)先put到putList,最后由commit方法會(huì)檢查channel是否有足夠的緩沖區(qū),有則合并到channel的隊(duì)列。
    channel.put -> transaction.doPut:
protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount();
      //計(jì)算數(shù)據(jù)字節(jié)大小
      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      //寫入臨時(shí)緩沖區(qū)putList
      if (!putList.offer(event)) {
        throw new ChannelException(
            "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }
      putByteCounter += eventByteSize;
    }

transaction.commit:

protected void doCommit() throws InterruptedException {
      //檢查channel的隊(duì)列剩余大小是否足夠
      int remainingChange = takeList.size() - putList.size();
      if (remainingChange < 0) {
        if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
          throw new ChannelException("Cannot commit transaction. Byte capacity " +
              "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
              "reached. Please increase heap space/byte capacity allocated to " +
              "the channel as the sinks may not be keeping up with the sources");
        }
        if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
          bytesRemaining.release(putByteCounter);
          throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
              " Sinks are likely not keeping up with sources, or the buffer size is too tight");
        }
      }
      int puts = putList.size();
      int takes = takeList.size();
      synchronized (queueLock) {
        if (puts > 0) {
          while (!putList.isEmpty()) {
            //寫入到channel的隊(duì)列
            if (!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }
          }
        }
        //清除臨時(shí)隊(duì)列
        putList.clear();
        takeList.clear();
      }
      bytesRemaining.release(takeByteCounter);
      takeByteCounter = 0;
      putByteCounter = 0;

      queueStored.release(puts);
      if (remainingChange > 0) {
        queueRemaining.release(remainingChange);
      }
      if (puts > 0) {
        channelCounter.addToEventPutSuccessCount(puts);
      }
      if (takes > 0) {
        channelCounter.addToEventTakeSuccessCount(takes);
      }

      channelCounter.setChannelSize(queue.size());
    }

如果在事務(wù)期間出現(xiàn)異常,比如channel剩余空間不足,則rollback:

protected void doRollback() {
      int takes = takeList.size();
      //檢查內(nèi)存隊(duì)列空間大小,是否足夠takeList寫回去
      synchronized (queueLock) {
        Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
            "Not enough space in memory channel " +
            "queue to rollback takes. This should never happen, please report");
        while (!takeList.isEmpty()) {
          queue.addFirst(takeList.removeLast());
        }
        //拋棄數(shù)據(jù),沒合并到channel的內(nèi)存隊(duì)列
        putList.clear();
      }
      putByteCounter = 0;
      takeByteCounter = 0;

      queueStored.release(takes);
      channelCounter.setChannelSize(queue.size());
    }

Take事務(wù)流程

Take事務(wù)分為以下階段:

  • doTake:先將數(shù)據(jù)取到臨時(shí)緩沖區(qū)takeList
  • doCommit:如果數(shù)據(jù)全部發(fā)送成功,則清除臨時(shí)緩沖區(qū)takeList
  • doRollback:數(shù)據(jù)發(fā)送過程中如果出現(xiàn)異常,rollback將臨時(shí)緩沖區(qū)takeList中的數(shù)據(jù)歸還給channel內(nèi)存隊(duì)列。


Sink其實(shí)是由SinkRunner線程調(diào)用Sink.process方法來了處理數(shù)據(jù)的。我們從HdfsEventSink的process方法說起,Sink類都有個(gè)process方法,用來處理傳輸數(shù)據(jù)的邏輯。:

public Status process() throws EventDeliveryException {
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
     //事務(wù)開始
    transaction.begin();
    try {
      Set<BucketWriter> writers = new LinkedHashSet<>();
      int txnEventCount = 0;
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
        //take數(shù)據(jù)到臨時(shí)緩沖區(qū),實(shí)際調(diào)用的是transaction.doTake
        Event event = channel.take();
        if (event == null) {
          break;
        }

        // reconstruct the path name by substituting place holders
        String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
            timeZone, needRounding, roundUnit, roundValue, useLocalTime);
        String realName = BucketPath.escapeString(fileName, event.getHeaders(),
            timeZone, needRounding, roundUnit, roundValue, useLocalTime);

        String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
        BucketWriter bucketWriter;
        HDFSWriter hdfsWriter = null;
        // Callback to remove the reference to the bucket writer from the
        // sfWriters map so that all buffers used by the HDFS file
        // handles are garbage collected.
        WriterCallback closeCallback = new WriterCallback() {
          @Override
          public void run(String bucketPath) {
            LOG.info("Writer callback called.");
            synchronized (sfWritersLock) {
              sfWriters.remove(bucketPath);
            }
          }
        };
        synchronized (sfWritersLock) {
          bucketWriter = sfWriters.get(lookupPath);
          // we haven't seen this file yet, so open it and cache the handle
          if (bucketWriter == null) {
            hdfsWriter = writerFactory.getWriter(fileType);
            bucketWriter = initializeBucketWriter(realPath, realName,
              lookupPath, hdfsWriter, closeCallback);
            sfWriters.put(lookupPath, bucketWriter);
          }
        }

        // Write the data to HDFS
        try {
          //寫數(shù)據(jù)到HDFS
          bucketWriter.append(event);
        } catch (BucketClosedException ex) {
          LOG.info("Bucket was closed while trying to append, " +
                   "reinitializing bucket and writing event.");
          hdfsWriter = writerFactory.getWriter(fileType);
          bucketWriter = initializeBucketWriter(realPath, realName,
            lookupPath, hdfsWriter, closeCallback);
          synchronized (sfWritersLock) {
            sfWriters.put(lookupPath, bucketWriter);
          }
          bucketWriter.append(event);
        }

        // track the buckets getting written in this transaction
        if (!writers.contains(bucketWriter)) {
          writers.add(bucketWriter);
        }
      }

      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }

      // flush all pending buckets before committing the transaction
      for (BucketWriter bucketWriter : writers) {
        bucketWriter.flush();
      }
      //事務(wù)提交
      transaction.commit();

      if (txnEventCount < 1) {
        return Status.BACKOFF;
      } else {
        sinkCounter.addToEventDrainSuccessCount(txnEventCount);
        return Status.READY;
      }
    } catch (IOException eIO) {
      transaction.rollback();
      LOG.warn("HDFS IO error", eIO);
      sinkCounter.incrementEventWriteFail();
      return Status.BACKOFF;
    } catch (Throwable th) {
      transaction.rollback();
      LOG.error("process failed", th);
      sinkCounter.incrementEventWriteOrChannelFail(th);
      if (th instanceof Error) {
        throw (Error) th;
      } else {
        throw new EventDeliveryException(th);
      }
    } finally {
      //關(guān)閉事務(wù)
      transaction.close();
    }
  }

大致流程圖:


接著看看channel.take,作用是將數(shù)據(jù)放到臨時(shí)緩沖區(qū),實(shí)際調(diào)用的是transaction.doTake:

protected Event doTake() throws InterruptedException {
      channelCounter.incrementEventTakeAttemptCount();
      if (takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
      }
      if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
        return null;
      }
      Event event;
      synchronized (queueLock) {
        //從channel內(nèi)存隊(duì)列取數(shù)據(jù)
        event = queue.poll();
      }
      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
          "signalling existence of entry");
      //將數(shù)據(jù)放到臨時(shí)緩沖區(qū)
      takeList.put(event);

      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      takeByteCounter += eventByteSize;

      return event;
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 簡介 Flume是Cloudera提供的一個(gè)高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng),F(xiàn)lume...
    達(dá)微閱讀 791評論 0 2
  • 閱讀目錄(Content) 一、Flume簡介 二、Flume特點(diǎn) 三、Flume的一些核心概念 3.1、Agen...
    達(dá)微閱讀 4,877評論 0 9
  • 1Flume概述 1.1 定義 Flume是Cloudera提供的一個(gè)高可用的,高可靠的,分布式的海量日志采集、聚...
    djm猿閱讀 690評論 0 4
  • 一、Flume簡介 flume 作為 cloudera 開發(fā)的實(shí)時(shí)日志收集系統(tǒng),受到了業(yè)界的認(rèn)可與廣泛應(yīng)用。Flu...
    superxcp閱讀 1,065評論 0 2
  • title: Flume構(gòu)建日志采集系統(tǒng)date: 2018-02-03 19:45tags: [flume,k...
    溯水心生閱讀 16,277評論 3 25

友情鏈接更多精彩內(nèi)容