內(nèi)部實(shí)現(xiàn)分析
首先通過調(diào)用RestController的registerHandler函數(shù)注冊(cè)split接口的handler為RestSplitIndexAction
RestSplitIndexAction
controller.registerHandler(RestRequest.Method.PUT, "/{index}/_split/{target}", this);
controller.registerHandler(RestRequest.Method.POST, "/{index}/_split/{target}", this);
當(dāng)elasticsearch收到請(qǐng)求時(shí)會(huì)進(jìn)入RestController的dispatcher函數(shù)
RestController
boolean dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client,
final Optional<RestHandler> mHandler) throws Exception {
final int contentLength = request.hasContent() ? request.content().length() : 0;
RestChannel responseChannel = channel;
// Indicator of whether a response was sent or not
boolean requestHandled;
if (contentLength > 0 && mHandler.map(h -> hasContentType(request, h) == false).orElse(false)) {
...
} else if (contentLength > 0 && mHandler.map(h -> h.supportsContentStream()).orElse(false) &&
request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) {
...
} else if (mHandler.isPresent()) {
//在這個(gè)分支里處理
try {
if (canTripCircuitBreaker(mHandler)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
} else {
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
final RestHandler wrappedHandler = mHandler.map(h -> handlerWrapper.apply(h)).get();
//wrappedHandler其實(shí)就是在restController中注冊(cè)的handler,也就是RestSplitIndexAction
wrappedHandler.handleRequest(request, responseChannel, client);
requestHandled = true;
} catch (Exception e) {
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
// We "handled" the request by returning a response, even though it was an error
requestHandled = true;
}
} else {
...
}
// Return true if the request was handled, false otherwise.
return requestHandled;
}
上面代碼省略了一些其它的東西,主要是在mHandler.isPresent分支,因?yàn)殚_始的時(shí)候在restController中注冊(cè)了split的handler。所以mHandler一定不為空。handleRequest其實(shí)是BaseRestHandler中的一個(gè)方法。跟進(jìn)去看一下。
BaseRestHandler
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// prepare the request for execution; has the side effect of touching the request parameters
final RestChannelConsumer action = prepareRequest(request, client);
...
usageCount.increment();
// execute the action
//正真的執(zhí)行在這里,調(diào)用prepareRequset返回的channleConsumer,并將channle傳遞給它。
action.accept(channel);
}
在該方法中首先調(diào)用了prepareRequest方法,該方法會(huì)返回一個(gè)RestChannleConsumer,在elasticsearch中有大量的這種consumer接口。這種接口其實(shí)就是java 1.8中的函數(shù)式接口。接口中有一個(gè)accept函數(shù),該接受一個(gè)參數(shù)。該consumer是prepareRequest函數(shù)返回的。而在RestSplitIndexAction中覆蓋了該方法。
RestSplitIndexAction
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
if (request.param("target") == null) {
throw new IllegalArgumentException("no target index");
}
if (request.param("index") == null) {
throw new IllegalArgumentException("no source index");
}
ResizeRequest shrinkIndexRequest = new ResizeRequest(request.param("target"), request.param("index"));
shrinkIndexRequest.setResizeType(ResizeType.SPLIT);
request.applyContentParser(parser -> ResizeRequest.PARSER.parse(parser, shrinkIndexRequest, null));
shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout()));
shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout()));
shrinkIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards")));
return channel -> client.admin().indices().resizeIndex(shrinkIndexRequest, new AcknowledgedRestListener<ResizeResponse>(channel) {
@Override
public void addCustomFields(XContentBuilder builder, ResizeResponse response) throws IOException {
response.addCustomFields(builder);
}
});
}
在該方法中,構(gòu)造了一個(gè)ResizeRequest對(duì)象,并將源索引和目標(biāo)索引傳遞進(jìn)去。同時(shí)設(shè)置resize的方式為spilt。elasticsearch支持兩種resize操作。一種是split,將shard 分裂。另一種是shrink。也就是將shard合并。設(shè)置了一些控制超時(shí)的參數(shù)后返回了一個(gè)匿名函數(shù)。在BaseRestHandler對(duì)象中的handleRequest函數(shù)最終會(huì)調(diào)用該函數(shù)。然后進(jìn)入了IndicesAdmin中的resizeIndex函數(shù)。
IndicesAdmin
public void resizeIndex(ResizeRequest request, ActionListener<ResizeResponse> listener) {
execute(ResizeAction.INSTANCE, request, listener);
}
在該函數(shù)中調(diào)用execute函數(shù),傳入了一個(gè)ResezeAction對(duì)象。在elasticsearch中,對(duì)外暴露的接口都是通過內(nèi)部的action對(duì)象來處理的。因?yàn)閑lasticsearch本身提供了restful的接口和rpc接口(傳輸層客戶端)。所以有兩套action。通過restful調(diào)的接口首先會(huì)被以rest開頭的action處理(rest接口和對(duì)應(yīng)的處理action關(guān)系由RestController維護(hù)),然后再在中間做一層轉(zhuǎn)換,找到相應(yīng)的以transport開頭的action來處理。比如在split接口中,首先會(huì)被RestSplitIndexAction處理。處理完后進(jìn)入了IndicesAdmin中的resizeIndex函數(shù)。而該函數(shù)中就直接去執(zhí)行ResizeAction了。
IndicesAdmin
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
client.execute(action, request, listener);
}
然后會(huì)調(diào)用client的execute方法來執(zhí)行
AbstractClient
public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
listener = threadedWrapper.wrap(listener);
doExecute(action, request, listener);
}
隨后再調(diào)用doExecute方法,因?yàn)樵贗ndicesAdmin中的client是NodeClient,所以直接進(jìn)入NodeClient的doExecute方法。
NodeClient
public < Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>
> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
// Discard the task because the Client interface doesn't use it.
executeLocally(action, request, listener);
}
然后再調(diào)用executeLocally函數(shù)
public < Request extends ActionRequest,
Response extends ActionResponse
> Task executeLocally(GenericAction<Request, Response> action, Request request, ActionListener<Response> listener) {
return transportAction(action).execute(request, listener);
}
這里首先調(diào)用了transportAction函數(shù),并把ResizeAction傳了進(jìn)去。這一步其實(shí)就是解析傳輸層的handler。
private < Request extends ActionRequest,
Response extends ActionResponse
> TransportAction<Request, Response> transportAction(GenericAction<Request, Response> action) {
if (actions == null) {
throw new IllegalStateException("NodeClient has not been initialized");
}
//根據(jù)傳入的action去actions里找到對(duì)應(yīng)的傳輸層action來處理
TransportAction<Request, Response> transportAction = actions.get(action);
if (transportAction == null) {
throw new IllegalStateException("failed to find action [" + action + "] to execute");
}
return transportAction;
}
其中actions就是一個(gè)map,在ActionModule中的setupActions中會(huì)向actions中注冊(cè)所有的傳輸層action。
ActionModule
static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
// Subclass NamedRegistry for easy registration
class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
ActionRegistry() {
super("action");
}
public void register(ActionHandler<?, ?> handler) {
register(handler.getAction().name(), handler);
}
public <Request extends ActionRequest, Response extends ActionResponse> void register(
GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction,
Class<?>... supportTransportActions) {
register(new ActionHandler<>(action, transportAction, supportTransportActions));
}
}
ActionRegistry actions = new ActionRegistry();
...
//此處省略了很多action的注冊(cè)
actions.register(ResizeAction.INSTANCE, TransportResizeAction.class);
...
actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);
return unmodifiableMap(actions.getRegistry());
}
可以看到,ResizeAction對(duì)應(yīng)的處理對(duì)象為TransportResizeAction?;氐絅odeClient中的executeLocally函數(shù),在找到對(duì)應(yīng)的action處理后,調(diào)用其execute方法。
TransportAction
public final Task execute(Request request, ActionListener<Response> listener) {
/*
* While this version of execute could delegate to the TaskListener
* version of execute that'd add yet another layer of wrapping on the
* listener and prevent us from using the listener bare if there isn't a
* task. That just seems like too many objects. Thus the two versions of
* this method.k
*/
Task task = taskManager.register("transport", actionName, request);
if (task == null) {
execute(null, request, listener);
} else {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
}
return task;
}
這里首先向taskManager注冊(cè)一個(gè)task,task就是一個(gè)任務(wù)的包裝,包括該任務(wù)的類型、創(chuàng)建的時(shí)間、執(zhí)行的action、task id及父task信息。生成task后調(diào)用了另一個(gè)重載的execute函數(shù),同時(shí)對(duì)listener重新包裝了一下,這里之所以重新包裝主要是為了在listener中調(diào)用taskManager的unregister函數(shù),把該task去掉。
public final void execute(Task task, Request request, ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
if (task != null && request.getShouldStoreResult()) {
listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
}
RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(task, actionName, request, listener);
}
execute函數(shù)中首先調(diào)用request.validate驗(yàn)證該請(qǐng)求是否有效,如果通過后會(huì)構(gòu)造一個(gè)RequestFilterChain對(duì)象。
RequestFilterChain
public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
int i = index.getAndIncrement();
try {
if (i < this.action.filters.length) {
this.action.filters[i].apply(task, actionName, request, listener, this);
} else if (i == this.action.filters.length) {
this.action.doExecute(task, request, listener);
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch(Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}
action中有一個(gè)filter數(shù)組,維護(hù)著所有的filter。如果有filter的話會(huì)逐個(gè)的調(diào)用filter來處理。直到最后調(diào)用action的doExecute方法。TransportResizeAction沒有設(shè)置filter,所以會(huì)直接調(diào)用action.doExecute方法。因?yàn)門ransportResizeAction繼承了TransportMasterNodeAction,最終進(jìn)入了TransportMasterNodeAction的doExecute方法。
TransportMasterNodeAction
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
new AsyncSingleAction(task, request, listener).start();
}
這里創(chuàng)建了一個(gè)AsyncSingleAction對(duì)象,并調(diào)用了期start方法。
AsyncSingleAction
public void start() {
ClusterState state = clusterService.state();
this.observer = new ClusterStateObserver(state, clusterService, request.masterNodeTimeout(), logger, threadPool.getThreadContext());
doStart(state);
}
首先獲取了集群當(dāng)前的狀態(tài),然后調(diào)用doStart方法
protected void doStart(ClusterState clusterState) {
final Predicate<ClusterState> masterChangePredicate = MasterNodeChangePredicate.build(clusterState);
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
} else {
logger.trace("can't execute due to a cluster block, retrying", blockException);
retry(blockException, newState -> {
ClusterBlockException newException = checkBlock(request, newState);
return (newException == null || !newException.retryable());
});
}
} else {
ActionListener<Response> delegate = new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
listener.onResponse(response);
}
@Override
public void onFailure(Exception t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) {
logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
} else {
listener.onFailure(t);
}
}
};
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
@Override
protected void doRun() throws Exception {
masterOperation(task, request, clusterState, delegate);
}
});
}
} else {
if (nodes.getMasterNode() == null) {
logger.debug("no known master node, scheduling a retry");
retry(null, masterChangePredicate);
} else {
DiscoveryNode masterNode = nodes.getMasterNode();
final String actionName = getMasterActionName(masterNode);
transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener,
TransportMasterNodeAction.this::newResponse) {
@Override
public void handleException(final TransportException exp) {
Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
actionName, nodes.getMasterNode(), exp.getDetailedMessage());
retry(cause, masterChangePredicate);
} else {
listener.onFailure(exp);
}
}
});
}
}
}
這里先判斷該節(jié)點(diǎn)是不是master節(jié)點(diǎn)。如果不是master節(jié)點(diǎn)需要獲取master節(jié)點(diǎn)并且把請(qǐng)求轉(zhuǎn)發(fā)到master上去執(zhí)行。否則就在本地執(zhí)行。如果是本地執(zhí)行,又將listener包裝了一次,這次包裝主要是為了在失敗的時(shí)候能重試。準(zhǔn)備工作做完后就獲取生成一個(gè)ActionRunnable對(duì)象,并執(zhí)行起run方法。注意,這里還是同步執(zhí)行的。
EsExecutor
public void execute(Runnable command) {
command.run();
}
AbstractRunnable
public final void run() {
try {
doRun();
} catch (Exception t) {
onFailure(t);
} finally {
onAfter();
}
}
ActionRunnable
protected void doRun() throws Exception {
masterOperation(task, request, clusterState, delegate);
}
TransportMasterNodeAction
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
masterOperation(request, state, listener);
}
在這里task信息就直接背忽略了,最終調(diào)用了TransportResizeAction的masterOperation方法
TransportResizeAction
protected void masterOperation(final ResizeRequest resizeRequest, final ClusterState state,
final ActionListener<ResizeResponse> listener) {
// there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
(i) -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest,
ActionListener.wrap(response ->
listener.onResponse(new ResizeResponse(response.isAcknowledged(), response.isShardsAcked(),
updateRequest.index())), listener::onFailure
)
);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
這里似乎又執(zhí)行了一個(gè)task,其實(shí)該task是為了獲取索引的狀態(tài)信息的。但貌似這個(gè)索引狀態(tài)只有在調(diào)用shrink api的時(shí)候才會(huì)有用,這里暫時(shí)不分析。當(dāng)索引狀態(tài)獲取完畢后,會(huì)調(diào)用listener的onResponse函數(shù)。注意,這里進(jìn)入到onResponse函數(shù)里其實(shí)已經(jīng)是在另外的線程里了。在onResponse函數(shù)中緊接著調(diào)用了prepareCreateIndexRequest函數(shù)。
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ResizeRequest resizeRequest, final ClusterState state
, final IntFunction<DocsStats> perShardDocStats, String sourceIndexName, String targetIndexName) {
//此處省略了一些代碼
...
//對(duì)目標(biāo)索引中的每個(gè)shard
for (int i = 0; i < numShards; i++) {
if (resizeRequest.getResizeType() == ResizeType.SHRINK) {
Set<ShardId> shardIds = IndexMetaData.selectShrinkShards(i, metaData, numShards);
long count = 0;
for (ShardId id : shardIds) {
DocsStats docsStats = perShardDocStats.apply(id.id());
if (docsStats != null) {
count += docsStats.getCount();
}
if (count > IndexWriter.MAX_DOCS) {
throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS
+ "] docs - too many documents in shards " + shardIds);
}
}
} else {
//在這里對(duì)目標(biāo)索引中的每個(gè)shard都選擇一個(gè)源shard。判斷源shard是否為空,如果為空則拋異常
Objects.requireNonNull(IndexMetaData.selectSplitShard(i, metaData, numShards));
// we just execute this to ensure we get the right exceptions if the number of shards is wrong or less then etc.
}
}
if (IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.exists(targetIndexSettings)) {
throw new IllegalArgumentException("cannot provide a routing partition size value when resizing an index");
}
if (IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(targetIndexSettings)) {
throw new IllegalArgumentException("cannot provide index.number_of_routing_shards on resize");
}
String cause = resizeRequest.getResizeType().name().toLowerCase(Locale.ROOT) + "_index";
targetIndex.cause(cause);
Settings.Builder settingsBuilder = Settings.builder().put(targetIndexSettings);
settingsBuilder.put("index.number_of_shards", numShards);
targetIndex.settings(settingsBuilder);
return new CreateIndexClusterStateUpdateRequest(targetIndex,
cause, targetIndex.index(), targetIndexName, true)
// mappings are updated on the node when creating in the shards, this prevents race-conditions since all mapping must be
// applied once we took the snapshot and if somebody messes things up and switches the index read/write and adds docs we miss
// the mappings for everything is corrupted and hard to debug
.ackTimeout(targetIndex.timeout())
.masterNodeTimeout(targetIndex.masterNodeTimeout())
.settings(targetIndex.settings())
.aliases(targetIndex.aliases())
.customs(targetIndex.customs())
.waitForActiveShards(targetIndex.waitForActiveShards())
.recoverFrom(metaData.getIndex())
.resizeType(resizeRequest.getResizeType());
}
這個(gè)函數(shù)比較長(zhǎng),其中最重要的一步可以看
Objects.requireNonNull(IndexMetaData.selectSplitShard(i, metaData, numShards));
這一行代碼內(nèi)部其實(shí)體現(xiàn)了elasticsearch在split的時(shí)候是怎么分裂的。即目標(biāo)索引的shard是從源索引的哪個(gè)shard split得到的。
IndexMetaData
public static ShardId selectSplitShard(int shardId, IndexMetaData sourceIndexMetadata, int numTargetShards) {
if (shardId >= numTargetShards) {
throw new IllegalArgumentException("the number of target shards (" + numTargetShards + ") must be greater than the shard id: "
+ shardId);
}
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
if (numSourceShards > numTargetShards) {
throw new IllegalArgumentException("the number of source shards [" + numSourceShards
+ "] must be less that the number of target shards [" + numTargetShards + "]");
}
int routingFactor = getRoutingFactor(numSourceShards, numTargetShards);
// now we verify that the numRoutingShards is valid in the source index
int routingNumShards = sourceIndexMetadata.getRoutingNumShards();
if (routingNumShards % numTargetShards != 0) {
throw new IllegalStateException("the number of routing shards ["
+ routingNumShards + "] must be a multiple of the target shards [" + numTargetShards + "]");
}
// this is just an additional assertion that ensures we are a factor of the routing num shards.
assert getRoutingFactor(numTargetShards, sourceIndexMetadata.getRoutingNumShards()) >= 0;
return new ShardId(sourceIndexMetadata.getIndex(), shardId/routingFactor);
}
這個(gè)函數(shù)接受三個(gè)參數(shù),分別為目標(biāo)索引中的某個(gè)shard, 源索引的metadata,目標(biāo)索引中總共有多少個(gè)shard。先計(jì)算出routingFactor,這里的routingFactor其實(shí)是指split 擴(kuò)大了多少倍。也就是用numTargetShards / numSourceShards。到后面還有有個(gè)地方有計(jì)算routingFactor,但其實(shí)和這里的概念不一樣。從最后的返回值可以看出,最終的計(jì)算表達(dá)式為:
sourceShardId = targetShardId / (numTargetShards / numSourceShards)
比如源索引有兩個(gè)shard,想要分裂為四個(gè)shard。那么目標(biāo)索引的shard id 和源索引的shard id關(guān)系為:
| 源shard | 目標(biāo)shard |
|---|---|
| 0 | 0 |
| 0 | 1 |
| 1 | 2 |
| 1 | 3 |
然后回到prepareCreateIndexRequest函數(shù),驗(yàn)證通過后,創(chuàng)建了一個(gè)CreateIndexClusterStateUpdateRequest對(duì)象。從名字也可以看出這是一個(gè)集群狀態(tài)變更對(duì)象,而且是一次創(chuàng)建索引的集群變更。創(chuàng)建后設(shè)置了一些屬性,最重要的我覺得是recoverFrom屬性,該屬性用于決定目標(biāo)索引數(shù)據(jù)怎么獲取。然后繼續(xù)回退到masterOperation函數(shù)。這里將prepareCreateIndexRequest對(duì)象復(fù)制給updateRequest后,傳遞到了createIndexService的createIndex函數(shù),同時(shí)對(duì)listener又包裝了一次,這次包裝主要是為了替換response對(duì)象,在這里將其替換成了ResizeResponse對(duì)象。createIndexService其實(shí)是一個(gè)MetaDataCreateIndexService對(duì)象,負(fù)責(zé)創(chuàng)建索引的請(qǐng)求。
MetaDataCreateIndexService
public void createIndex(final CreateIndexClusterStateUpdateRequest request,
final ActionListener<CreateIndexClusterStateUpdateResponse> listener) {
onlyCreateIndex(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
shardsAcked -> {
if (shardsAcked == false) {
logger.debug("[{}] index created, but the operation timed out while waiting for " +
"enough shards to be started.", request.index());
}
listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcked));
}, listener::onFailure);
} else {
listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false));
}
}, listener::onFailure));
}
這里只是對(duì)listener又包裝了一次,用于判斷集群狀態(tài)是否被正確變更,如果是,則等待相應(yīng)的shard個(gè)數(shù)被激活??梢钥吹絜lasticsearch里邊采用了大量的異步方式,大量的listener包裝、使用導(dǎo)致很容易跟丟代碼,并且相應(yīng)的注釋也比較少。所以elasticsearch的代碼還是比較難閱讀的。廢話不多說,再次對(duì)listener包裝后進(jìn)入了onlyCreateIndex函數(shù),從名字上也可以看出這個(gè)函數(shù)僅僅只創(chuàng)建索引。所以創(chuàng)建索引和等待相應(yīng)的shard被激活這是異步的。有可能索引創(chuàng)建成功,但shard并沒有被創(chuàng)建。
private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request,
final ActionListener<ClusterStateUpdateResponse> listener) {
Settings.Builder updatedSettingsBuilder = Settings.builder();
Settings build = updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build();
indexScopedSettings.validate(build, true); // we do validate here - index setting must be consistent
request.settings(build);
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
new IndexCreationTask(logger, allocationService, request, listener, indicesService, aliasValidator, xContentRegistry, settings,
this::validate));
}
到這一步就向集群服務(wù)提交了一個(gè)狀態(tài)更新任務(wù),并指命令操作原因,及封裝了一個(gè)IndexCreationTask任務(wù)。
ClusterService
public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener>
void submitStateUpdateTask(String source, T updateTask) {
submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
}
public <T> void submitStateUpdateTask(String source, T task,
ClusterStateTaskConfig config,
ClusterStateTaskExecutor<T> executor,
ClusterStateTaskListener listener) {
submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
}
public <T> void submitStateUpdateTasks(final String source,
final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor) {
masterService.submitStateUpdateTasks(source, tasks, config, executor);
}
到這里變成了調(diào)用MasterService的服務(wù)。
MasterService
public <T> void submitStateUpdateTasks(final String source,
final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor) {
if (!lifecycle.started()) {
return;
}
try {
List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor))
.collect(Collectors.toList());
taskBatcher.submitTasks(safeTasks, config.timeout());
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;
}
}
}
這里用到了java1.8中的語法,其實(shí)就是對(duì)task做了一次封裝。封裝成UpdateTask對(duì)象,并調(diào)用TaskBatcher提交task。
UpdateTask
UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
ClusterStateTaskExecutor<?> executor) {
super(priority, source, executor, task);
this.listener = listener;
}
TaskBatcher
public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
if (tasks.isEmpty()) {
return;
}
final BatchedTask firstTask = tasks.get(0);
assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :
"tasks submitted in a batch should share the same batching key: " + tasks;
// convert to an identity map to check for dups based on task identity
final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(
BatchedTask::getTask,
Function.identity(),
(a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },
IdentityHashMap::new));
synchronized (tasksPerBatchingKey) {
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,
k -> new LinkedHashSet<>(tasks.size()));
for (BatchedTask existing : existingTasks) {
// check that there won't be two tasks with the same identity for the same batching key
BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
if (duplicateTask != null) {
throw new IllegalStateException("task [" + duplicateTask.describeTasks(
Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");
}
}
existingTasks.addAll(tasks);
}
if (timeout != null) {
threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
} else {
threadExecutor.execute(firstTask);
}
}
這個(gè)函數(shù)里有兩處查重的步驟,第一個(gè)是檢查函數(shù)傳入的tasks中有沒有重復(fù)的task,第二個(gè)是檢查本次提交的tasks是否和歷史提交的tasks有重復(fù)。tasksPerBatchingKey維護(hù)了同一個(gè)batchingKey對(duì)應(yīng)的所有task。隨后就調(diào)用executor執(zhí)行這個(gè)task。
PrioritizedEsThreadPoolExecutor
public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
command = wrapRunnable(command);
doExecute(command);
if (timeout.nanos() >= 0) {
if (command instanceof TieBreakingPrioritizedRunnable) {
((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout);
} else {
// We really shouldn't be here. The only way we can get here if somebody created PrioritizedFutureTask
// and passed it to execute, which doesn't make much sense
throw new UnsupportedOperationException("Execute with timeout is not supported for future tasks");
}
}
}
這里先會(huì)對(duì)command包裝一下,先看下里邊究竟干了啥。
protected Runnable wrapRunnable(Runnable command) {
if (command instanceof PrioritizedRunnable) {
if ((command instanceof TieBreakingPrioritizedRunnable)) {
return command;
}
Priority priority = ((PrioritizedRunnable) command).priority();
//UpdateTask對(duì)象最終會(huì)被包裝成這個(gè)對(duì)象
return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), priority, insertionOrder.incrementAndGet());
} else if (command instanceof PrioritizedFutureTask) {
return command;
} else { // it might be a callable wrapper...
if (command instanceof TieBreakingPrioritizedRunnable) {
return command;
}
return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), Priority.NORMAL, insertionOrder.incrementAndGet());
}
}
command對(duì)象其實(shí)是前文所說的UpdateTask對(duì)象,該對(duì)象是一種優(yōu)先級(jí)執(zhí)行對(duì)象,所以最終會(huì)被包裝成TieBreakingPrioritizedRunnable對(duì)象,這個(gè)對(duì)象實(shí)現(xiàn)了Runnable的run方法:
TieBreakingPrioritizedRunnable
public void run() {
synchronized (this) {
// make the task as stared. This is needed for synchronization with the timeout handling
// see #scheduleTimeout()
started = true;
FutureUtils.cancel(timeoutFuture);
}
runAndClean(runnable);
}
繼續(xù)回到execute方法中,包裝完command后直接調(diào)用了doExecute方法,該方法其實(shí)是PrioritizedEsThreadPoolExecutor父類EsThreadPoolExecutor的一個(gè)方法。
EsThreadPoolExecutor
protected void doExecute(final Runnable command) {
try {
super.execute(command);
} catch (EsRejectedExecutionException ex) {
if (command instanceof AbstractRunnable) {
// If we are an abstract runnable we can handle the rejection
// directly and don't need to rethrow it.
try {
((AbstractRunnable) command).onRejection(ex);
} finally {
((AbstractRunnable) command).onAfter();
}
} else {
throw ex;
}
}
}
EsThreadPoolExecutor繼承了java中的ThreadPoolExecutor,有關(guān)elasticsearch中的executor后面再分析。到這里可以看到在doExecute中調(diào)用了父類的execute方法,最終提交了該任務(wù)到線程池中執(zhí)行。提交后回到execute方法中,如果設(shè)置了超時(shí)時(shí)間,則在一段時(shí)間后調(diào)用超時(shí)回調(diào)函數(shù)。
至此,創(chuàng)建索引的任務(wù)已經(jīng)被提交。在下篇文章中將會(huì)分析任務(wù)是怎么執(zhí)行的。