?? 方案概述
背景:原索引5億數據,需要每月生成一個新索引,新增數據插入新索引,查詢需要查詢所有索引。
核心目標:業(yè)務代碼零改動,使用別名實現透明化管理。
?? 架構設計
索引結構
原索引:voice-data (5億數據)
↓ 遷移
歷史索引:voice-data-history (5億數據,只讀)
月度索引:voice-data-2025-11, voice-data-2025-12, ...
別名:voice-data (業(yè)務代碼使用的名稱)
├─ voice-data-history (讀)
├─ voice-data-2025-11 (讀)
└─ voice-data-2025-12 (讀+寫?)
數據流向
┌──────────────┐
│ 業(yè)務應用層 │
│ 使用: voice-data │
└───────┬──────┘
│ (別名透明路由)
↓
┌─────────────────────────┐
│ Elasticsearch 集群 │
│ │
│ 寫入 → voice-data-2025-12 (is_write_index=true) │
│ │
│ 查詢 → voice-data-history │
│ → voice-data-2025-11 │
│ → voice-data-2025-12 │
└─────────────────────────┘
?? 實施步驟
階段一:準備工作(業(yè)務無影響)
1. 創(chuàng)建首個月度索引
PUT /voice-data-2025-11
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"refresh_interval": "30s"
},
"mappings": {
"properties": {
"id": { "type": "keyword" },
"timestamp": { "type": "date" },
"content": { "type": "text" },
"status": { "type": "keyword" }
}
}
}
2. 克隆原索引為歷史索引
# 方式A:使用 clone API(推薦,速度快)
POST /voice-data/_clone/voice-data-history
# 方式B:使用 reindex(適合需要數據過濾的場景)
POST /_reindex
{
"source": { "index": "voice-data" },
"dest": { "index": "voice-data-history" }
}
3. 刪除原索引,創(chuàng)建別名
# 刪除原索引
DELETE /voice-data
# 創(chuàng)建別名配置
POST /_aliases
{
"actions": [
{
"add": {
"index": "voice-data-history",
"alias": "voice-data",
"is_write_index": false
}
},
{
"add": {
"index": "voice-data-2025-11",
"alias": "voice-data",
"is_write_index": true
}
}
]
}
4. 驗證別名配置
# 查看別名
GET /_cat/aliases/voice-data?v
# 預期輸出:
# alias index is_write_index
# voice-data voice-data-history -
# voice-data voice-data-2025-11 true
階段二:業(yè)務代碼驗證(無需修改)
寫入測試
// ? 業(yè)務代碼保持不變,仍使用 voice-data
IndexRequest request = new IndexRequest("voice-data")
.source(XContentType.JSON,
"timestamp", new Date(),
"content", "測試數據");
client.index(request, RequestOptions.DEFAULT);
// ES自動路由到 voice-data-2025-11(因為 is_write_index=true)
查詢測試
// ? 業(yè)務代碼保持不變
SearchRequest request = new SearchRequest("voice-data");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// ES自動查詢所有關聯索引:
// - voice-data-history
// - voice-data-2025-11
?? Java 自動化實現
1. Maven 依賴
<dependencies>
<!-- Elasticsearch Java Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.10</version>
</dependency>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.7.14</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<!-- JSON處理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
2. Elasticsearch 配置類
package com.example.es.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfig {
@Value("${elasticsearch.host:localhost}")
private String host;
@Value("${elasticsearch.port:9200}")
private int port;
@Value("${elasticsearch.scheme:http}")
private String scheme;
@Bean
public RestHighLevelClient restHighLevelClient() {
return new RestHighLevelClient(
RestClient.builder(new HttpHost(host, port, scheme))
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(5000)
.setSocketTimeout(60000))
);
}
}
3. 索引管理服務類
package com.example.es.service;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Set;
@Slf4j
@Service
public class IndexManagementService {
@Autowired
private RestHighLevelClient client;
@Value("${elasticsearch.index.prefix:voice-data}")
private String indexPrefix;
@Value("${elasticsearch.index.alias:voice-data}")
private String indexAlias;
@Value("${elasticsearch.index.shards:5}")
private int numberOfShards;
@Value("${elasticsearch.index.replicas:1}")
private int numberOfReplicas;
/**
* 創(chuàng)建月度索引
*/
public boolean createMonthlyIndex() {
String indexName = getCurrentMonthIndexName();
try {
// 檢查索引是否已存在
if (indexExists(indexName)) {
log.warn("索引 {} 已存在,跳過創(chuàng)建", indexName);
return false;
}
// 創(chuàng)建索引
CreateIndexRequest request = new CreateIndexRequest(indexName);
// 設置索引配置
request.settings(Settings.builder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", numberOfReplicas)
.put("index.refresh_interval", "30s")
.put("index.max_result_window", 10000)
);
// 設置映射
request.mapping(createMapping());
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("? 索引 {} 創(chuàng)建成功", indexName);
return true;
} else {
log.error("? 索引 {} 創(chuàng)建失敗", indexName);
return false;
}
} catch (IOException e) {
log.error("創(chuàng)建索引 {} 時發(fā)生異常", indexName, e);
return false;
}
}
/**
* 更新別名配置
*/
public boolean updateAliasForNewMonth() {
String newIndex = getCurrentMonthIndexName();
String lastMonthIndex = getLastMonthIndexName();
try {
IndicesAliasesRequest request = new IndicesAliasesRequest();
// 1. 如果上月索引存在,移除其寫入標記
if (indexExists(lastMonthIndex)) {
request.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(lastMonthIndex)
.alias(indexAlias)
.writeIndex(false)
);
log.info("移除上月索引 {} 的寫入標記", lastMonthIndex);
}
// 2. 將新索引添加到別名,并設置為寫入索引
request.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(newIndex)
.alias(indexAlias)
.writeIndex(true)
);
AcknowledgedResponse response = client.indices().updateAliases(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("? 別名 {} 已更新,寫入索引切換到 {}", indexAlias, newIndex);
return true;
} else {
log.error("? 別名更新失敗");
return false;
}
} catch (IOException e) {
log.error("更新別名時發(fā)生異常", e);
return false;
}
}
/**
* 執(zhí)行月度索引切換(創(chuàng)建 + 更新別名)
*/
public boolean performMonthlyRotation() {
log.info("========== 開始執(zhí)行月度索引切換 ==========");
// 1. 創(chuàng)建新月度索引
boolean indexCreated = createMonthlyIndex();
if (!indexCreated) {
log.error("月度索引創(chuàng)建失敗,中止切換流程");
return false;
}
// 2. 更新別名配置
boolean aliasUpdated = updateAliasForNewMonth();
if (!aliasUpdated) {
log.error("別名更新失敗,但新索引已創(chuàng)建");
return false;
}
// 3. 可選:將上月索引設置為只讀
String lastMonthIndex = getLastMonthIndexName();
if (indexExists(lastMonthIndex)) {
setIndexReadOnly(lastMonthIndex);
}
log.info("========== 月度索引切換完成 ==========");
return true;
}
/**
* 查看當前別名配置
*/
public void printAliasInfo() {
try {
GetAliasesRequest request = new GetAliasesRequest(indexAlias);
GetAliasesResponse response = client.indices().getAlias(request, RequestOptions.DEFAULT);
Map<String, Set<AliasMetadata>> aliases = response.getAliases();
log.info("========== 別名 {} 當前配置 ==========", indexAlias);
aliases.forEach((indexName, aliasMetadataSet) -> {
aliasMetadataSet.forEach(aliasMetadata -> {
Boolean isWriteIndex = aliasMetadata.writeIndex();
log.info("索引: {}, 別名: {}, 寫入索引: {}",
indexName, aliasMetadata.alias(),
isWriteIndex != null && isWriteIndex ? "?" : "-");
});
});
log.info("==========================================");
} catch (IOException e) {
log.error("查詢別名信息時發(fā)生異常", e);
}
}
/**
* 設置索引為只讀
*/
private void setIndexReadOnly(String indexName) {
try {
org.elasticsearch.client.indices.PutSettingsRequest request =
new org.elasticsearch.client.indices.PutSettingsRequest(indexName);
request.settings(Settings.builder()
.put("index.blocks.write", true)
.build());
AcknowledgedResponse response = client.indices().putSettings(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
log.info("? 索引 {} 已設置為只讀", indexName);
}
} catch (IOException e) {
log.error("設置索引 {} 只讀時發(fā)生異常", indexName, e);
}
}
/**
* 檢查索引是否存在
*/
private boolean indexExists(String indexName) {
try {
GetIndexRequest request = new GetIndexRequest(indexName);
return client.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("檢查索引 {} 是否存在時發(fā)生異常", indexName, e);
return false;
}
}
/**
* 創(chuàng)建索引映射
*/
private XContentBuilder createMapping() throws IOException {
return XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("id")
.field("type", "keyword")
.endObject()
.startObject("timestamp")
.field("type", "date")
.field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
.endObject()
.startObject("content")
.field("type", "text")
.field("analyzer", "ik_max_word")
.field("search_analyzer", "ik_smart")
.endObject()
.startObject("status")
.field("type", "keyword")
.endObject()
.startObject("userId")
.field("type", "keyword")
.endObject()
.startObject("createTime")
.field("type", "date")
.endObject()
.endObject()
.endObject();
}
/**
* 獲取當前月份的索引名稱
*/
private String getCurrentMonthIndexName() {
String yearMonth = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM"));
return indexPrefix + "-" + yearMonth;
}
/**
* 獲取上個月的索引名稱
*/
private String getLastMonthIndexName() {
String yearMonth = LocalDate.now().minusMonths(1).format(DateTimeFormatter.ofPattern("yyyy-MM"));
return indexPrefix + "-" + yearMonth;
}
}
4. 定時任務調度類
package com.example.es.scheduler;
import com.example.es.service.IndexManagementService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Slf4j
@Component
public class IndexRotationScheduler {
@Autowired
private IndexManagementService indexManagementService;
/**
* 每月1號凌晨00:05執(zhí)行索引切換
* cron表達式:秒 分 時 日 月 周
*/
@Scheduled(cron = "0 5 0 1 * ?")
public void monthlyIndexRotation() {
String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
log.info("========================================");
log.info("? 定時任務觸發(fā)時間: {}", timestamp);
log.info("========================================");
try {
// 執(zhí)行月度索引切換
boolean success = indexManagementService.performMonthlyRotation();
if (success) {
log.info("?? 月度索引切換成功");
// 打印當前別名配置
indexManagementService.printAliasInfo();
// 可選:發(fā)送成功通知
sendSuccessNotification();
} else {
log.error("? 月度索引切換失敗");
// 發(fā)送告警通知
sendAlertNotification("月度索引切換失敗");
}
} catch (Exception e) {
log.error("? 月度索引切換過程中發(fā)生異常", e);
sendAlertNotification("月度索引切換異常: " + e.getMessage());
}
}
/**
* 每天凌晨檢查索引狀態(tài)(可選)
*/
@Scheduled(cron = "0 0 1 * * ?")
public void dailyIndexHealthCheck() {
log.info("執(zhí)行每日索引健康檢查...");
indexManagementService.printAliasInfo();
}
/**
* 發(fā)送成功通知
*/
private void sendSuccessNotification() {
// TODO: 實現通知邏輯(郵件、釘釘、企業(yè)微信等)
log.info("?? 發(fā)送成功通知");
}
/**
* 發(fā)送告警通知
*/
private void sendAlertNotification(String message) {
// TODO: 實現告警邏輯
log.error("?? 發(fā)送告警通知: {}", message);
}
}
5. 手動觸發(fā)接口(便于測試)
package com.example.es.controller;
import com.example.es.service.IndexManagementService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RestController
@RequestMapping("/api/index")
public class IndexManagementController {
@Autowired
private IndexManagementService indexManagementService;
/**
* 手動觸發(fā)月度索引切換
*/
@PostMapping("/rotate")
public Map<String, Object> manualRotation() {
Map<String, Object> result = new HashMap<>();
try {
log.info("收到手動觸發(fā)索引切換請求");
boolean success = indexManagementService.performMonthlyRotation();
result.put("success", success);
result.put("message", success ? "索引切換成功" : "索引切換失敗");
return result;
} catch (Exception e) {
log.error("手動索引切換失敗", e);
result.put("success", false);
result.put("message", "索引切換異常: " + e.getMessage());
return result;
}
}
/**
* 創(chuàng)建月度索引(僅創(chuàng)建,不切換別名)
*/
@PostMapping("/create")
public Map<String, Object> createMonthlyIndex() {
Map<String, Object> result = new HashMap<>();
boolean success = indexManagementService.createMonthlyIndex();
result.put("success", success);
result.put("message", success ? "索引創(chuàng)建成功" : "索引創(chuàng)建失敗");
return result;
}
/**
* 查看當前別名配置
*/
@GetMapping("/alias/info")
public Map<String, Object> getAliasInfo() {
Map<String, Object> result = new HashMap<>();
indexManagementService.printAliasInfo();
result.put("success", true);
result.put("message", "請查看日志輸出");
return result;
}
/**
* 健康檢查
*/
@GetMapping("/health")
public Map<String, Object> healthCheck() {
Map<String, Object> result = new HashMap<>();
result.put("status", "ok");
result.put("timestamp", System.currentTimeMillis());
return result;
}
}
6. 配置文件
# application.yml
server:
port: 8080
spring:
application:
name: elasticsearch-index-manager
# 啟用定時任務
task:
scheduling:
enabled: true
# Elasticsearch配置
elasticsearch:
host: localhost
port: 9200
scheme: http
# 索引配置
index:
prefix: voice-data # 索引前綴
alias: voice-data # 別名(業(yè)務使用的名稱)
shards: 5 # 分片數
replicas: 1 # 副本數
# 日志配置
logging:
level:
com.example.es: INFO
org.elasticsearch: WARN
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
7. 啟動類
package com.example.es;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication
public class ElasticsearchIndexManagerApplication {
public static void main(String[] args) {
SpringApplication.run(ElasticsearchIndexManagerApplication.class, args);
System.out.println("======================================");
System.out.println(" Elasticsearch 索引管理服務已啟動");
System.out.println(" 定時任務:每月1號 00:05 自動切換索引");
System.out.println("======================================");
}
}
8. 測試類
package com.example.es;
import com.example.es.service.IndexManagementService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class IndexManagementTest {
@Autowired
private IndexManagementService indexManagementService;
@Test
public void testCreateMonthlyIndex() {
boolean result = indexManagementService.createMonthlyIndex();
System.out.println("創(chuàng)建索引結果: " + (result ? "成功" : "失敗"));
}
@Test
public void testUpdateAlias() {
boolean result = indexManagementService.updateAliasForNewMonth();
System.out.println("更新別名結果: " + (result ? "成功" : "失敗"));
}
@Test
public void testPerformRotation() {
boolean result = indexManagementService.performMonthlyRotation();
System.out.println("索引切換結果: " + (result ? "成功" : "失敗"));
}
@Test
public void testPrintAliasInfo() {
indexManagementService.printAliasInfo();
}
}
?? 運行效果演示
啟動日志
2025-11-12 10:00:00 [main] INFO c.e.es.ElasticsearchIndexManagerApplication - Starting...
2025-11-12 10:00:02 [main] INFO o.e.c.RestHighLevelClient - Elasticsearch客戶端初始化成功
======================================
Elasticsearch 索引管理服務已啟動
定時任務:每月1號 00:05 自動切換索引
======================================
定時任務執(zhí)行日志
========================================
? 定時任務觸發(fā)時間: 2025-12-01 00:05:00
========================================
========== 開始執(zhí)行月度索引切換 ==========
? 索引 voice-data-2025-12 創(chuàng)建成功
移除上月索引 voice-data-2025-11 的寫入標記
? 別名 voice-data 已更新,寫入索引切換到 voice-data-2025-12
? 索引 voice-data-2025-11 已設置為只讀
========== 月度索引切換完成 ==========
?? 月度索引切換成功
========== 別名 voice-data 當前配置 ==========
索引: voice-data-history, 別名: voice-data, 寫入索引: -
索引: voice-data-2025-11, 別名: voice-data, 寫入索引: -
索引: voice-data-2025-12, 別名: voice-data, 寫入索引: ?
==========================================
?? 發(fā)送成功通知
手動觸發(fā)測試
# 測試創(chuàng)建索引
curl -X POST http://localhost:8080/api/index/create
# 響應:
{
"success": true,
"message": "索引創(chuàng)建成功"
}
# 測試完整切換
curl -X POST http://localhost:8080/api/index/rotate
# 響應:
{
"success": true,
"message": "索引切換成功"
}
# 查看別名信息
curl http://localhost:8080/api/index/alias/info
# 響應:
{
"success": true,
"message": "請查看日志輸出"
}
?? 業(yè)務代碼示例(無需修改)
數據寫入
@Service
public class VoiceDataService {
@Autowired
private RestHighLevelClient client;
// ? 業(yè)務代碼完全不用改,仍然使用 voice-data
public void saveVoiceData(VoiceData data) throws IOException {
IndexRequest request = new IndexRequest("voice-data") // 使用別名
.id(data.getId())
.source(XContentType.JSON,
"timestamp", data.getTimestamp(),
"content", data.getContent(),
"status", data.getStatus());
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
// ES自動路由到當前寫入索引(voice-data-2025-12)
}
}
數據查詢
@Service
public class VoiceSearchService {
@Autowired
private RestHighLevelClient client;
// ? 查詢代碼也不用改
public List<VoiceData> searchVoiceData(String keyword) throws IOException {
SearchRequest request = new SearchRequest("voice-data"); // 使用別名
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchQuery("content", keyword));
builder.size(100);
builder.sort("timestamp", SortOrder.DESC);
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// ES自動查詢所有關聯索引:
// - voice-data-history
// - voice-data-2025-11
// - voice-data-2025-12
// 解析結果...
return parseSearchHits(response.getHits());
}
}
聚合統(tǒng)計
@Service
public class VoiceStatisticsService {
@Autowired
private RestHighLevelClient client;
// ? 聚合統(tǒng)計也無需修改
public Map<String, Long> getMonthlyStatistics() throws IOException {
SearchRequest request = new SearchRequest("voice-data"); // 使用別名
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.size(0);
builder.aggregation(
AggregationBuilders.dateHistogram("monthly")
.field("timestamp")
.calendarInterval(DateHistogramInterval.MONTH)
);
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// ES自動在所有索引上執(zhí)行聚合并合并結果
// 解析聚合結果...
return parseAggregation(response.getAggregations());
}
}