本文基于AvroSource,MemoryChannel,HDFSSink三個(gè)組件,對Flume數(shù)據(jù)傳輸?shù)氖聞?wù)進(jìn)行分析,如果使用的是其他組件,F(xiàn)lume事務(wù)具體的處理方式將會(huì)不同。一般情況下,用MemoryChannel就好了,我們公司用的就是這個(gè),F(xiàn)ileChannel速度慢,雖然提供日志級別的數(shù)據(jù)恢復(fù),但是一般情況下,不斷電MemoryChannel是不會(huì)丟數(shù)據(jù)的。
Flume提供事物操作,保證用戶的數(shù)據(jù)的可靠性,主要體現(xiàn)在:
- 數(shù)據(jù)在傳輸?shù)较聜€(gè)節(jié)點(diǎn)時(shí)(通常是批量數(shù)據(jù)),如果接收節(jié)點(diǎn)出現(xiàn)異常,比如網(wǎng)絡(luò)異常,則回滾這一批數(shù)據(jù)。因此有可能導(dǎo)致數(shù)據(jù)重發(fā)
-
同個(gè)節(jié)點(diǎn)內(nèi),Source寫入數(shù)據(jù)到Channel,數(shù)據(jù)在一個(gè)批次內(nèi)的數(shù)據(jù)出現(xiàn)異常,則不寫入到Channel。已接收到的部分?jǐn)?shù)據(jù)直接拋棄,靠上一個(gè)節(jié)點(diǎn)重發(fā)數(shù)據(jù)。
編程模型
一個(gè)事務(wù)的實(shí)現(xiàn)是在一個(gè)channel的實(shí)現(xiàn)中實(shí)現(xiàn)的,每一個(gè)連接到Channel的Source和Sink必須獲得一個(gè)事務(wù)對象。Source使用ChannelProcessor管理事務(wù),Sink的事務(wù)管理則通過他們配置的Channel進(jìn)行管理。Flume在對Channel進(jìn)行Put和Take操作的時(shí)候,必須要用事物包住,比如:
Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event eventToStage = EventBuilder.withBody("Hello Flume!",
Charset.forName("UTF-8"));
ch.put(eventToStage);
// Event takenEvent = ch.take();
// ...
txn.commit();
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
Put事務(wù)流程
Put事務(wù)可以分為以下階段:
- doPut:將批數(shù)據(jù)先寫入臨時(shí)緩沖區(qū)putList
- doCommit:檢查channel內(nèi)存隊(duì)列是否足夠合并。
- doRollback:channel內(nèi)存隊(duì)列空間不足,拋棄數(shù)據(jù),回滾event
我們從Source數(shù)據(jù)接收到寫入Channel這個(gè)過程對Put事務(wù)進(jìn)行分析。

AvroSource會(huì)spawn多個(gè)Worker線程(ThriftSourceHandler)去處理數(shù)據(jù),Worker處理數(shù)據(jù)的接口,我們只看batch批量處理這個(gè)接口:
public Status appendBatch(List<AvroFlumeEvent> events) {
logger.debug("Avro source {}: Received avro event batch of {} events.",
getName(), events.size());
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
List<Event> batch = new ArrayList<Event>();
for (AvroFlumeEvent avroEvent : events) {
Event event = EventBuilder.withBody(avroEvent.getBody().array(),
toStringMap(avroEvent.getHeaders()));
batch.add(event);
}
try {
//ChannelProcessor,在Source初始化的時(shí)候傳進(jìn)來.將數(shù)據(jù)寫入對應(yīng)的Channel
getChannelProcessor().processEventBatch(batch);
} catch (Throwable t) {
logger.error("Avro source " + getName() + ": Unable to process event " +
"batch. Exception follows.", t);
sourceCounter.incrementChannelWriteFail();
if (t instanceof Error) {
throw (Error) t;
}
return Status.FAILED;
}
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
return Status.OK;
}
事務(wù)邏輯都在processEventBatch這個(gè)方法里:
public void processEventBatch(List<Event> events) {
Preconditions.checkNotNull(events, "Event list must not be null");
//預(yù)處理每行數(shù)據(jù)
events = interceptorChain.intercept(events);
Map<Channel, List<Event>> reqChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
Map<Channel, List<Event>> optChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
//分類數(shù)據(jù),劃分不同的channel集合對應(yīng)的數(shù)據(jù)
for (Event event : events) {
List<Channel> reqChannels = selector.getRequiredChannels(event);
for (Channel ch : reqChannels) {
List<Event> eventQueue = reqChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
reqChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}
List<Channel> optChannels = selector.getOptionalChannels(event);
for (Channel ch : optChannels) {
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
optChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}
}
// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
//事務(wù)開始,tx即MemoryTransaction類實(shí)例
tx.begin();
List<Event> batch = reqChannelQueue.get(reqChannel);
for (Event event : batch) {
// 這個(gè)put操作實(shí)際調(diào)用的是transaction.doPut
reqChannel.put(event);
}
//提交,將數(shù)據(jù)寫入Channel的隊(duì)列中
tx.commit();
} catch (Throwable t) {
//回滾
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();
}
}
}
// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
Transaction tx = optChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = optChannelQueue.get(optChannel);
for (Event event : batch) {
optChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
tx.rollback();
LOG.error("Unable to put batch on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close();
}
}
}
}
每個(gè)Worker線程都擁有一個(gè)Transaction實(shí)例,保存在Channel(BasicChannelSemantics)里的ThreadLocal<BasicTransactionSemantics>變量currentTransaction中。
那么,事務(wù)到底做了什么?

實(shí)際上,Transaction實(shí)例包含兩個(gè)雙向阻塞隊(duì)列LinkedBlockingDeque(感覺沒必要用雙向隊(duì)列,每個(gè)線程寫自己的putList,又不是多個(gè)線程?),分別為:
- putList
- takeList
對于Put事物操作,當(dāng)然是只用到putList了。putList就是一個(gè)臨時(shí)的緩沖區(qū),數(shù)據(jù)會(huì)先put到putList,最后由commit方法會(huì)檢查channel是否有足夠的緩沖區(qū),有則合并到channel的隊(duì)列。
channel.put -> transaction.doPut:
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
//計(jì)算數(shù)據(jù)字節(jié)大小
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
//寫入臨時(shí)緩沖區(qū)putList
if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}
transaction.commit:
protected void doCommit() throws InterruptedException {
//檢查channel的隊(duì)列剩余大小是否足夠
int remainingChange = takeList.size() - putList.size();
if (remainingChange < 0) {
if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
throw new ChannelException("Cannot commit transaction. Byte capacity " +
"allocated to store event body " + byteCapacity * byteCapacitySlotSize +
"reached. Please increase heap space/byte capacity allocated to " +
"the channel as the sinks may not be keeping up with the sources");
}
if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized (queueLock) {
if (puts > 0) {
while (!putList.isEmpty()) {
//寫入到channel的隊(duì)列
if (!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
//清除臨時(shí)隊(duì)列
putList.clear();
takeList.clear();
}
bytesRemaining.release(takeByteCounter);
takeByteCounter = 0;
putByteCounter = 0;
queueStored.release(puts);
if (remainingChange > 0) {
queueRemaining.release(remainingChange);
}
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}
channelCounter.setChannelSize(queue.size());
}
如果在事務(wù)期間出現(xiàn)異常,比如channel剩余空間不足,則rollback:
protected void doRollback() {
int takes = takeList.size();
//檢查內(nèi)存隊(duì)列空間大小,是否足夠takeList寫回去
synchronized (queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
"Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while (!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
//拋棄數(shù)據(jù),沒合并到channel的內(nèi)存隊(duì)列
putList.clear();
}
putByteCounter = 0;
takeByteCounter = 0;
queueStored.release(takes);
channelCounter.setChannelSize(queue.size());
}
Take事務(wù)流程
Take事務(wù)分為以下階段:
- doTake:先將數(shù)據(jù)取到臨時(shí)緩沖區(qū)takeList
- doCommit:如果數(shù)據(jù)全部發(fā)送成功,則清除臨時(shí)緩沖區(qū)takeList
-
doRollback:數(shù)據(jù)發(fā)送過程中如果出現(xiàn)異常,rollback將臨時(shí)緩沖區(qū)takeList中的數(shù)據(jù)歸還給channel內(nèi)存隊(duì)列。
Sink其實(shí)是由SinkRunner線程調(diào)用Sink.process方法來了處理數(shù)據(jù)的。我們從HdfsEventSink的process方法說起,Sink類都有個(gè)process方法,用來處理傳輸數(shù)據(jù)的邏輯。:
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
//事務(wù)開始
transaction.begin();
try {
Set<BucketWriter> writers = new LinkedHashSet<>();
int txnEventCount = 0;
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
//take數(shù)據(jù)到臨時(shí)緩沖區(qū),實(shí)際調(diào)用的是transaction.doTake
Event event = channel.take();
if (event == null) {
break;
}
// reconstruct the path name by substituting place holders
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String realName = BucketPath.escapeString(fileName, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
BucketWriter bucketWriter;
HDFSWriter hdfsWriter = null;
// Callback to remove the reference to the bucket writer from the
// sfWriters map so that all buffers used by the HDFS file
// handles are garbage collected.
WriterCallback closeCallback = new WriterCallback() {
@Override
public void run(String bucketPath) {
LOG.info("Writer callback called.");
synchronized (sfWritersLock) {
sfWriters.remove(bucketPath);
}
}
};
synchronized (sfWritersLock) {
bucketWriter = sfWriters.get(lookupPath);
// we haven't seen this file yet, so open it and cache the handle
if (bucketWriter == null) {
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
sfWriters.put(lookupPath, bucketWriter);
}
}
// Write the data to HDFS
try {
//寫數(shù)據(jù)到HDFS
bucketWriter.append(event);
} catch (BucketClosedException ex) {
LOG.info("Bucket was closed while trying to append, " +
"reinitializing bucket and writing event.");
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
synchronized (sfWritersLock) {
sfWriters.put(lookupPath, bucketWriter);
}
bucketWriter.append(event);
}
// track the buckets getting written in this transaction
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
}
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
//事務(wù)提交
transaction.commit();
if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return Status.READY;
}
} catch (IOException eIO) {
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
sinkCounter.incrementEventWriteFail();
return Status.BACKOFF;
} catch (Throwable th) {
transaction.rollback();
LOG.error("process failed", th);
sinkCounter.incrementEventWriteOrChannelFail(th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
//關(guān)閉事務(wù)
transaction.close();
}
}
大致流程圖:

接著看看channel.take,作用是將數(shù)據(jù)放到臨時(shí)緩沖區(qū),實(shí)際調(diào)用的是transaction.doTake:
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
if (takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count");
}
if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
return null;
}
Event event;
synchronized (queueLock) {
//從channel內(nèi)存隊(duì)列取數(shù)據(jù)
event = queue.poll();
}
Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
"signalling existence of entry");
//將數(shù)據(jù)放到臨時(shí)緩沖區(qū)
takeList.put(event);
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
takeByteCounter += eventByteSize;
return event;
}

