MetaData信息
這里主要有兩類
- 有效的Bookie的列表
- 用來跟蹤哪些Bookie是有效的
- Ledger的相關(guān)信息
其相關(guān)操作如下:
- createLedger: 創(chuàng)建一個新的Ledger, 它擁有一個唯一ID和當前的Version(對應(yīng)到zk的話,這個version就是znode的dataVersion);
- removeLedgerMetadata: 移除一個Ledger, 需要提供當前本地保存的Version, 和 MetaData Storage中的Version作check, 一致才允許作remove操作;
- readLedgerMetadata: 讀取一個Ledger的相關(guān)meta信息, 同時需要更新此Ledger的meta信息的Version;
- writeLedgerMetadata: 更新Ledger的相關(guān)meta信息,同樣需要提供當前本地保存的Version, 和 MetaData Storage中的Version作check, 一致才允許操作;
- asyncProcessLedgers: 遍歷當前所有的Ledger,分別對其應(yīng)用一個給定的處理函數(shù);
MetaData Storage的選取
- 需要首先滿足以下幾點要求:
- 支持CAS操作: Check and Set, 比如上面提到的在刪除和更新操作時需先比較Version;
- 針對連續(xù)write的優(yōu)化;
- 針對Scan操作的優(yōu)化;
- 目前來看合適的MetaData Storage有zookeeper, etcd, 如果ledger數(shù)量超級大,還可以使用HBase;
- Apache BookKeeper當前默認使用Zookeeper實現(xiàn);
MetaData操作的實現(xiàn)
MetadataBookieDriver
在Apache BookKeeper中對MetaData的所有操作都被封裝到一個抽象接口MetadataBookieDriver中;
public interface MetadataBookieDriver extends AutoCloseable {
// 初始化當前的Driver
MetadataBookieDriver initialize(ServerConfiguration conf,
RegistrationListener listener,
StatsLogger statsLogger)
throws MetadataException;
String getScheme();
// RegistrationManager負責管理Bookie注冊到Storage的相關(guān)操作
RegistrationManager getRegistrationManager();
LedgerManagerFactory getLedgerManagerFactory()
throws MetadataException;
LayoutManager getLayoutManager();
@Override
void close();
}
MetadataDrivers
負責管理所有的MetadataBookieDriver
- 將所有Driver信息保存在
private static final ConcurrentMap<String, MetadataBookieDriverInfo> bookieDrivers;, 其中key是scheme, value是MetadataBookieDriverInfo, 定義如下:
static class MetadataBookieDriverInfo {
final Class<? extends MetadataBookieDriver> driverClass;
final String driverClassName;
MetadataBookieDriverInfo(Class<? extends MetadataBookieDriver> driverClass) {
this.driverClass = driverClass;
this.driverClassName = driverClass.getName();
}
}
利用java的反射機制根據(jù)driverClass即可產(chǎn)生出對應(yīng)的MetadataBookieDriver對象;
- 默認包含
org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver, 即ZkMetadataBookieDriver, 其scheme為zk - 獲取MetadataBookieDriver
public static MetadataBookieDriver getBookieDriver(URI uri) {
//對于zk來說,這個uri形如:zk+hierarchical://10.1.1.1:2181;10.1.1.2:2181;10.1.1.3:2181/ledgers
String scheme = uri.getScheme();
// scheme 為 zk
scheme = scheme.toLowerCase();
String[] schemeParts = StringUtils.split(scheme, '+');
if (!initialized) {
initialize();
}
MetadataBookieDriverInfo driverInfo = bookieDrivers.get(scheme.toLowerCase());
if (null == driverInfo) {
throw new IllegalArgumentException("Unknown backend " + scheme);
}
// 利用java的反射機制
return ReflectionUtils.newInstance(driverInfo.driverClass);
}
ZkMetadataBookieDriver的實現(xiàn)
-
getRegistrationManager: 返回ZKRegistrationManager
if (null == regManager) {
regManager = new ZKRegistrationManager(
serverConf,
zk,
listener
);
}
return regManager;
-
initialize: 主要是調(diào)用其父類ZKMetadataDriverBase的initialize方法
主要作的事情就是創(chuàng)建了操作zk的Zookeeper對象和ZkLayoutManager對象
protected void initialize(AbstractConfiguration<?> conf,
StatsLogger statsLogger,
RetryPolicy zkRetryPolicy,
Optional<Object> optionalCtx) throws MetadataException {
this.conf = conf;
this.acls = ZkUtils.getACLs(conf);
if (optionalCtx.isPresent()
...
} else {
final String metadataServiceUriStr;
try {
metadataServiceUriStr = conf.getMetadataServiceUri();
} catch (ConfigurationException e) {
throw new MetadataException(
Code.INVALID_METADATA_SERVICE_URI, e);
}
...
final String zkServers = getZKServersFromServiceUri(metadataServiceUri);
try {
this.zk = ZooKeeperClient.newBuilder()
.connectString(zkServers)
.sessionTimeoutMs(conf.getZkTimeout())
.operationRetryPolicy(zkRetryPolicy)
.requestRateLimit(conf.getZkRequestRateLimit())
.statsLogger(statsLogger)
.build();
if (null == zk.exists(bookieReadonlyRegistrationPath, false)) {
try {
zk.create(bookieReadonlyRegistrationPath,
EMPTY_BYTE_ARRAY,
acls,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
} catch (KeeperException.NoNodeException e) {
}
}
} catch (IOException | KeeperException e) {
throw me;
}
this.ownZKHandle = true;
}
// once created the zookeeper client, create the layout manager and registration client
this.layoutManager = new ZkLayoutManager(
zk,
ledgersRootPath,
acls);
}
-
getLedgerManagerFactory: 直接沿用其父類ZKMetadataDriverBase的, 返回LedgerManagerFactory對象,用于創(chuàng)建LedgerManager
public synchronized LedgerManagerFactory getLedgerManagerFactory()
throws MetadataException {
if (null == lmFactory) {
try {
lmFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
conf,
layoutManager);
} catch (IOException e) {
throw new MetadataException(
Code.METADATA_SERVICE_ERROR, "Failed to initialized ledger manager factory", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
}
}
return lmFactory;
}
ZkRegistrationManager
- 主要用于當前bookie信息以臨時節(jié)點的方式注冊到zk上,取消注冊,寫cookie, 讀取cookie
- 注冊bookie實現(xiàn)
doRegisterBookie: 在zk上創(chuàng)建臨時節(jié)點
private void doRegisterBookie(String regPath) throws BookieException {
// ZK ephemeral node for this Bookie.
try {
if (!checkRegNodeAndWaitExpired(regPath)) {
// Create the ZK ephemeral node for this Bookie.
zk.create(regPath, new byte[0], zkAcls, CreateMode.EPHEMERAL);
zkRegManagerInitialized = true;
}
} catch (KeeperException ke) {
throw new MetadataStoreException(ke);
} catch (InterruptedException ie) {
throw new MetadataStoreException(ie);
} catch (IOException e) {
throw new MetadataStoreException(e);
}
}
- 寫cookie操作
writeCookie: 在每個bookie的cookie信息寫在形如/ledgers/cookies/10.209.1.1:3181的節(jié)點, cookie內(nèi)容形如:
4 ---- 當前cookie的layout版本號
bookieHost: "10.209.240.36:3181"
journalDir: "/data/bookkeeper/journal"
ledgerDirs: "1\t/data/bookkeeper/ledger"
instanceId: "eb314bf8-885e-4c60-803d-32fd7858d790" ---- 當前集群的id
- 初始化新的cluster
initNewCluster:
創(chuàng)建 /ledgers 節(jié)點
創(chuàng)建 /ledgers/available 節(jié)點
創(chuàng)建 /ledgers/available/readonly 節(jié)點
創(chuàng)建 /ledgers/INSTANCEID 節(jié)點
創(chuàng)建新的 LedgerManagerFactory
public boolean initNewCluster() throws Exception {
String zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
String instanceIdPath = ledgersRootPath + "/" + INSTANCEID;
boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
if (ledgerRootExists) {
return false;
}
List<Op> multiOps = Lists.newArrayListWithExpectedSize(4);
// Create ledgers root node
multiOps.add(Op.create(ledgersRootPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT));
// create available bookies node
multiOps.add(Op.create(bookieRegistrationPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT));
// create readonly bookies node
multiOps.add(Op.create(
bookieReadonlyRegistrationPath,
EMPTY_BYTE_ARRAY,
zkAcls,
CreateMode.PERSISTENT));
// create INSTANCEID
String instanceId = UUID.randomUUID().toString();
multiOps.add(Op.create(instanceIdPath, instanceId.getBytes(UTF_8),
zkAcls, CreateMode.PERSISTENT));
// execute the multi ops
// 這個multi操作組合了對多個node的操作,本質(zhì)上也是原子操作,要么都成功,要么都失敗
zk.multi(multiOps);
// creates the new layout and stores in zookeeper
// 如果當前 zk上/ledger/LAYOUT節(jié)點沒有數(shù)據(jù),且layoutManager不為null, 下面這個調(diào)用會寫入新的/ledger/LAYOUT數(shù)據(jù)
AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager);
return true;
}
LedgerManagerFactory
- 前面我們已經(jīng)說過存儲在zk上的meta信息,其中最主要的一個就是ledger的信息,ledger的數(shù)量可能很少也可能很多,都存儲在zk上的話,需要有個合理的組織形式,目前主要有兩種:
- Flat Ledger Layout: 所有的ledger信息都存儲在唯一的一個znode(比如/ledger)下,這些ledger節(jié)點的命名以"L"開頭,后面是它的id,形如"/ledger/L001";這樣的存儲有一個問題,如果ledger數(shù)據(jù)太多的話,通過zk的getChilds接口獲取所有的ledger時,返回的結(jié)果會超過zk的package size,從而獲取失敗;
-
Hierarchical ledger manager: 分層存儲,先利用zk的
EPHEMERAL_SEQUENTIAL znode產(chǎn)生一個全局唯一的ledger id, 這種方式產(chǎn)生的id有10位,形如0000000001, 將其拆成兩層/ledger/00/0000/L0001,作為一個znode,存儲相對應(yīng)的ledger信息; -
LongHierarchical ledger manager: 上面的ledger id是31位,這個是63位, 在zk上的表示形如
/ledger/000/0000/0000/0000/L0001
-
LedgerManagerFactory: 創(chuàng)建LedgerManager,其繼承關(guān)系為下

ledger-factory-classes1.png
-
format接口: 刪除zk上所有的ledger信息,刪除/ledger/LAYOUT信息,寫入新的layout信息
public void format(AbstractConfiguration<?> conf, LayoutManager layoutManager)
throws InterruptedException, KeeperException, IOException {
try (AbstractZkLedgerManager ledgerManager = (AbstractZkLedgerManager) newLedgerManager()) {
String ledgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
List<String> children = zk.getChildren(ledgersRootPath, false);
for (String child : children) {
// 采用 hierarchical layou時,ledger信息是在zk的形如 /ledger/00的znode下,下面的代碼就是刪除所有的ledger信息
if (!AbstractZkLedgerManager.isSpecialZnode(child) && ledgerManager.isLedgerParentNode(child)) {
ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
}
}
}
Class<? extends LedgerManagerFactory> factoryClass;
try {
factoryClass = conf.getLedgerManagerFactoryClass();
} catch (ConfigurationException e) {
throw new IOException("Failed to get ledger manager factory class from configuration : ", e);
}
// 刪除zk上的 /ledger/LAYOUT
layoutManager.deleteLedgerLayout();
// Create new layout information again.
// 將新的layout寫到zk的 /ledger/LAYOUT下
createNewLMFactory(conf, layoutManager, factoryClass);
}
-
validateAndNukeExistingCluster: 清除zk上的所有節(jié)點 -
newLedgerIdGenerator: 返回一個ledger id的產(chǎn)生器:
public LedgerIdGenerator newLedgerIdGenerator() {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
ZkLedgerIdGenerator subIdGenerator = new ZkLedgerIdGenerator(zk, zkLedgersRootPath,
LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
return new LongZkLedgerIdGenerator(zk, zkLedgersRootPath, LongHierarchicalLedgerManager.IDGEN_ZNODE,
subIdGenerator, zkAcls);
}
支持產(chǎn)生31位和64位的id, 目前看起來足夠使用了。具體實現(xiàn)這里不講了,大家可以看下源碼,都是借助于zk的EPHEMERAL_SEQUENTIAL znode;
-
newLedgerManager:創(chuàng)建Ledgermanager對象
public LedgerManager newLedgerManager() {
return new HierarchicalLedgerManager(conf, zk);
}
LedgerManager
- 先看一下類的層級關(guān)系和需要實現(xiàn)的接口函數(shù)

ledgermanager-hi.png
-
createLedgerMetadata: 異步創(chuàng)建新的Ledger, 返回CompletableFuture<...>,
Metadata version大于2時,ledger metadata中需添加ctoken
public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId,
LedgerMetadata inputMetadata) {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
/*
* Metadata version大于2時,ledger metadata中需添加ctoken
*/
final long cToken = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
final LedgerMetadata metadata;
if (inputMetadata.getMetadataFormatVersion() > LedgerMetadataSerDe.METADATA_FORMAT_VERSION_2) {
metadata = LedgerMetadataBuilder.from(inputMetadata).withCToken(cToken).build();
} else {
metadata = inputMetadata;
}
String ledgerPath = getLedgerPath(ledgerId);
// 這個scb是zk操作完后的回調(diào)函數(shù)
StringCallback scb = new StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc == Code.OK.intValue()) {
// 創(chuàng)建ledger成功
promise.complete(new Versioned<>(metadata, new LongVersion(0)));
} else if (rc == Code.NODEEXISTS.intValue()) {
// 處理創(chuàng)建的ledger節(jié)點已經(jīng)存在的情況
if (metadata.getMetadataFormatVersion() > 2) {
//讀取當前已有的ledger的meta信息
CompletableFuture<Versioned<LedgerMetadata>> readFuture = readLedgerMetadata(ledgerId);
readFuture.handle((readMetadata, exception) -> {
if (exception == null) {
// 利用這個ctoken來判斷是不是當前的操作
if (readMetadata.getValue().getCToken() == cToken) {
FutureUtils.complete(promise, new Versioned<>(metadata, new LongVersion(0)));
} else {
promise.completeExceptionally(new BKException.BKLedgerExistException());
}
} else if (exception instanceof KeeperException.NoNodeException) {
promise.completeExceptionally(new BKException.BKLedgerExistException());
} else {
promise.completeExceptionally(new BKException.ZKException());
}
return null;
});
} else {
promise.completeExceptionally(new BKException.BKLedgerExistException());
}
} else {
promise.completeExceptionally(new BKException.ZKException());
}
}
};
final byte[] data;
try {
data = serDe.serialize(metadata);
} catch (IOException ioe) {
promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe));
return promise;
}
List<ACL> zkAcls = ZkUtils.getACLs(conf);
// 異步創(chuàng)建ledger節(jié)點,如果其父節(jié)點不存在,會遞歸創(chuàng)建
ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, data, zkAcls,
CreateMode.PERSISTENT, scb, null);
return promise;
}
-
removeLedgerMetadata: 異步刪除ledger的meta信息,刪除時不光提供ledger id,還要提供其在zk上的data version,供調(diào)用zk.delete時用 -
readLedgerMetadata: 異步讀取ledger的meta信息
protected CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId, Watcher watcher) {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
zk.getData(getLedgerPath(ledgerId), watcher, new DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (rc == KeeperException.Code.NONODE.intValue()) {
promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException());
return;
}
if (rc != KeeperException.Code.OK.intValue()) {
promise.completeExceptionally(new BKException.ZKException());
return;
}
if (stat == null) {
promise.completeExceptionally(new BKException.ZKException());
return;
}
try {
// 構(gòu)造 LedgerMetadata信息
LongVersion version = new LongVersion(stat.getVersion());
LedgerMetadata metadata = serDe.parseConfig(data, Optional.of(stat.getCtime()));
promise.complete(new Versioned<>(metadata, version));
} catch (Throwable t) {
promise.completeExceptionally(new BKException.ZKException());
}
}
}, null);
return promise;
}