業(yè)務(wù)模塊介紹
現(xiàn)在我們對整體的業(yè)務(wù)進行介紹以及演示

5. 全鏈路整體架構(gòu)
上面介紹了為什么需要全鏈路壓測,下面來看下全鏈路壓測的整體架構(gòu)。
整體架構(gòu)如下主要是對壓測客戶端的**壓測數(shù)據(jù)染色**,全鏈路中間件識別出**染色數(shù)據(jù)**,并將正常數(shù)據(jù)和壓測數(shù)據(jù)區(qū)分開,進行數(shù)據(jù)隔離,這里主要涉及到mysql數(shù)據(jù)庫,RabbitMQ,Redis,還需要處理因為hystrix線程池不能通過**ThreadLocal**傳遞染色表示的問題。

5.1 需要應(yīng)對的問題
5.1.1 業(yè)務(wù)問題
如何開展全鏈路壓測?在說這個問題前,我們先考慮下,全鏈路壓測有哪些問題比較難解決。
-
涉及的系統(tǒng)太多,牽扯的開發(fā)人員太多
在壓測過程中,做一個全鏈路的壓測一般會涉及到大量的系統(tǒng),在整個壓測過程中,光各個產(chǎn)品的人員協(xié)調(diào)就是一個比較大的工程,牽扯到太多的產(chǎn)品經(jīng)理和開發(fā)人員,如果公司對全鏈路壓測早期沒有足夠的重視,那么這個壓測工作是非常難開展的。 -
模擬的測試數(shù)據(jù)和訪問流量不真實
在壓測過程中經(jīng)常會遇到壓測后得到的數(shù)據(jù)不準確的問題,這就使得壓測出的數(shù)據(jù)參考性不強,為什么會產(chǎn)生這樣的問題?主要就是因為壓測的環(huán)境可能和生成環(huán)境存在誤差、參數(shù)存在不一樣的地方、測試數(shù)據(jù)存在不一樣的地方這些因素綜合起來導(dǎo)致測試結(jié)果的不可信。 -
壓測生產(chǎn)數(shù)據(jù)未隔離,影響生產(chǎn)環(huán)境
在全鏈路壓測過程中,壓測數(shù)據(jù)可能會影響到生產(chǎn)環(huán)境的真實數(shù)據(jù),舉個例子,電商系統(tǒng)在生產(chǎn)環(huán)境進行全鏈路壓測的時候可能會有很多壓測模擬用戶去下單,如果不做處理,直接下單的話會導(dǎo)致系統(tǒng)一下子會產(chǎn)生很多廢訂單,從而影響到庫存和生產(chǎn)訂單數(shù)據(jù),影響到日常的正常運營。
5.1.2 技術(shù)問題
5.1.2.1 探針的性能消耗
APM組件服務(wù)的影響應(yīng)該做到足夠小。
**服務(wù)調(diào)用埋點本身會帶來性能損耗,這就需要調(diào)用跟蹤的低損耗,實際中還會通過配置采樣率的方式,選擇一部分請求去分析請求路徑**。在一些高度優(yōu)化過的服務(wù),即使一點點損耗也會很容易察覺到,而且有可能迫使在線服務(wù)的部署團隊不得不將跟蹤系統(tǒng)關(guān)停。
5.1.2.2 代碼的侵入性
即也作為業(yè)務(wù)組件,應(yīng)當盡可能少入侵或者無入侵其他業(yè)務(wù)系統(tǒng),對于使用方透明,減少開發(fā)人員的負擔。
對于應(yīng)用的程序員來說,是不需要知道有跟蹤系統(tǒng)這回事的。如果一個跟蹤系統(tǒng)想生效,就必須需要依賴應(yīng)用的開發(fā)者主動配合,那么這個跟蹤系統(tǒng)也太脆弱了,往往由于跟蹤系統(tǒng)在應(yīng)用中植入代碼的bug或疏忽導(dǎo)致應(yīng)用出問題,這樣才是無法滿足對跟蹤系統(tǒng)“無所不在的部署”這個需求。
5.1.2.3 可擴展性
一個優(yōu)秀的調(diào)用跟蹤系統(tǒng)必須支持分布式部署,具備良好的可擴展性。能夠支持的組件越多當然越好?;蛘咛峁┍憬莸牟寮_發(fā)API,對于一些沒有監(jiān)控到的組件,應(yīng)用開發(fā)者也可以自行擴展。
5.1.2.4 數(shù)據(jù)的分析
數(shù)據(jù)的分析要快 ,分析的維度盡可能多**。跟蹤系統(tǒng)能提供足夠快的信息反饋,就可以對生產(chǎn)環(huán)境下的異常狀況做出快速反應(yīng)。**分析的全面,能夠避免二次開發(fā)。
5.2 全鏈路壓測核心技術(shù)
上面從總體架構(gòu)層面分析了全鏈路壓測的核心,下面就分析下全鏈路壓測用到的核心技術(shù)點
5.2.1 全鏈路流量染色
做到微服務(wù)和中間件的染色標志的穿透
通過壓測平臺對輸出的壓力請求打上標識,在訂單系統(tǒng)中提取壓測標識,確保完整的程序上下文都持有該標識,并且能夠穿透微服務(wù)以及各種中間件,比如 MQ,hystrix,F(xiàn)egin等。
5.2.2 全鏈路服務(wù)監(jiān)控
需要能夠?qū)崟r監(jiān)控服務(wù)的運行狀況以及分析服務(wù)的調(diào)用鏈,我們采用skywalking進行服務(wù)監(jiān)控和壓測分析

5.2.3 全鏈路日志隔離
做到日志隔離,防止污染生產(chǎn)日志
當訂單系統(tǒng)向磁盤或外設(shè)輸出日志時,若流量是被標記的壓測流量,則將日志隔離輸出,避免影響生產(chǎn)日志。
5.2.4 全鏈路風(fēng)險熔斷
流量控制,防止流量超載,導(dǎo)致集群不可用
當訂單系統(tǒng)訪問會員系統(tǒng)時,通過RPC協(xié)議延續(xù)壓測標識到會員系統(tǒng),兩個系統(tǒng)之間服務(wù)通訊將會有白黑名單開關(guān)來控制流量流入許可。該方案設(shè)計可以一定程度上避免下游系統(tǒng)出現(xiàn)瓶頸或不支持壓測所帶來的風(fēng)險,這里可以采用**Sentinel**來實現(xiàn)風(fēng)險熔斷。
5.3 全鏈路數(shù)據(jù)隔離
對各種存儲服務(wù)以及中間件做到數(shù)據(jù)隔離,方式數(shù)據(jù)污染
2.3.1 數(shù)據(jù)庫隔離
當會員系統(tǒng)訪問數(shù)據(jù)庫時,在持久化層同樣會根據(jù)壓測標識進行路由訪問壓測數(shù)據(jù)表。數(shù)據(jù)隔離的手段有多種,比如**影子庫**、**影子表**,或者**影子數(shù)據(jù)**,三種方案的仿真度會有一定的差異,他們的對比如下。
| 隔離性 | 兼容性 | 安全級別 | 技術(shù)難度 | |
|---|---|---|---|---|
| 影子庫 | 高 | 高 | 高 | 高 |
| 影子表 | 中 | 低 | 中 | 中 |
| 影子數(shù)據(jù) | 低 | 低 | 低 | 低 |
5.3.2 消息隊列隔離
當我們生產(chǎn)的消息扔到MQ之后,接著讓消費者進行消費,這個沒有問題,壓測的數(shù)據(jù)不能夠直接扔到MQ中的,因為它會被正常的消費者消費到的,要做好數(shù)據(jù)隔離,方案有**隊列隔離**,**消息隔離**,他們對比如下。
| 隔離性 | 兼容性 | 安全級別 | 技術(shù)難度 | |
|---|---|---|---|---|
| 隊列隔離 | 高 | 好 | 高 | 高 |
| 消息隔離 | 低 | 低 | 低 | 中 |
5.3.3 Redis 隔離
通過 key 值來區(qū)分,壓測流量的 key 值加統(tǒng)一后綴,通過改造RedisTemplate來實現(xiàn)key的路由。
框架實現(xiàn)
6.1 流量染色方案
上面分析了從整體分析了全鏈路壓測用的的核心技術(shù),下面就來實現(xiàn)第一個流量染色。
6.1.1 流量識別
要想壓測的流量和數(shù)據(jù)不影響線上真實的生產(chǎn)數(shù)據(jù),就需要線上的集群能識別出壓測的流量,只要能識別出壓測請求的流量,那么流量觸發(fā)的讀寫操作就很好統(tǒng)一去做隔離了。
全鏈路壓測發(fā)起的都是Http的請求,只需要要請求頭上添加統(tǒng)一的壓測請求頭。
通過在請求協(xié)議中添加壓測請求的標識,在不同服務(wù)的相互調(diào)用時,一路透傳下去,這樣每一個服務(wù)都能識別出壓測的請求流量,這樣做的好處是與業(yè)務(wù)完全的解耦,只需要應(yīng)用框架進行感知,對業(yè)務(wù)方代碼無侵入。

6.1.2 MVC接收數(shù)據(jù)
客戶端傳遞過來的數(shù)據(jù)可以通過獲取Header的方式獲取到,并將其設(shè)置進當前的ThreadLocal,交給后面的方法使用。
6.1.2.1 MVC攔截器實現(xiàn)
/**
* 鏈路跟蹤Request設(shè)置值
*/
public class MvcWormholeWebInterceptor implements WebRequestInterceptor {
@Override
public void preHandle(WebRequest webRequest) {
//失效上下文,解決Tomcat線程復(fù)用問題
WormholeContextHolder.invalidContext();
String wormholeValue = webRequest.getHeader(WormholeContextHolder.WORMHOLE_REQUEST_MARK);
if (StringUtils.isNotEmpty(wormholeValue)) {
WormholeContextHolder.setContext(new WormholeContext(wormholeValue));
}
}
@Override
public void postHandle(WebRequest webRequest, ModelMap modelMap) throws Exception {
}
@Override
public void afterCompletion(WebRequest webRequest, Exception e) throws Exception {
}
}
6.1.2.2 Tomcat線程復(fù)用問題
tomcat默認使用線程池來管理線程,一個請求過來,如果線程池里面有空閑的線程,那么會在線程池里面取一個線程來處理該請求,一旦該線程當前在處理請求,其他請求就不會被分配到該線程上,直到該請求處理完成。請求處理完成后,會將該線程重新加入線程池,因為是通過線程池復(fù)用線程,就會如果線程內(nèi)部的ThreadLocal沒有清除就會出現(xiàn)問題,需要新的請求進來的時候,清除ThreadLocal。
6.1.3 Fegin傳遞傳遞染色標識
我們項目的微服務(wù)是使用Fegin來實現(xiàn)遠程調(diào)用的,跨微服務(wù)傳遞染色標識是通過MVC攔截器獲取到請求Header的染色標識,并放進ThreadLocal中,然后交給Fegin攔截器在發(fā)送請求之前從ThreadLocal中獲取到染色標識,并放進Fegin構(gòu)建請求的Header中,實現(xiàn)微服務(wù)之間的火炬?zhèn)鬟f。

6.1.3.1 代碼實現(xiàn)
public class WormholeFeignRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
WormholeContext wormholeContext = WormholeContextHolder.getContext();
if (null != wormholeContext) {
requestTemplate.header(WormholeContextHolder.WORMHOLE_REQUEST_MARK, wormholeContext.toString());
}
}
}
6.1.4 Hystrix傳遞染色標識
6.1.4.1 Hystrix隔離技術(shù)
Hystrix 實現(xiàn)資源隔離,主要有兩種技術(shù):
信號量
信號量的資源隔離只是起到一個開關(guān)的作用,比如,服務(wù) A 的信號量大小為 10,那么就是說它同時只允許有 10 個 tomcat 線程來訪問服務(wù) A,其它的請求都會被拒絕,從而達到資源隔離和限流保護的作用。

線程池
線程池隔離技術(shù),是用 Hystrix 自己的線程去執(zhí)行調(diào)用;而信號量隔離技術(shù),是直接讓 tomcat 線程去調(diào)用依賴服務(wù)。信號量隔離,只是一道關(guān)卡,信號量有多少,就允許多少個 tomcat 線程通過它,然后去執(zhí)行。

6.1.4.2 Hystrix穿透
如果使用線程池模式,那么存在一個ThreadLocal變量跨線程傳遞的問題,即在主線程的ThreadLocal變量,無法在線程池中使用,不過Hystrix內(nèi)部提供了解決方案。

封裝Callable任務(wù)
public final class DelegatingWormholeContextCallable<V> implements Callable<V> {
private final Callable<V> delegate;
// 用戶信息上下文(根據(jù)項目實際情況定義ThreadLocal上下文)
private WormholeContext orginWormholeContext;
public DelegatingWormholeContextCallable(Callable<V> delegate,
WormholeContext wormholeContext) {
this.delegate = delegate;
this.orginWormholeContext = wormholeContext;
}
public V call() throws Exception {
//防止線程復(fù)用銷毀ThreadLocal的數(shù)據(jù)
WormholeContextHolder.invalidContext();
// 將當前的用戶上下文設(shè)置進Hystrix線程的TreadLocal中
WormholeContextHolder.setContext(orginWormholeContext);
try {
return delegate.call();
} finally {
// 執(zhí)行完畢,記得清理ThreadLocal資源
WormholeContextHolder.invalidContext();
}
}
public static <V> Callable<V> create(Callable<V> delegate,
WormholeContext wormholeContext) {
return new DelegatingWormholeContextCallable<V>(delegate, wormholeContext);
}
}
實現(xiàn)Hystrix的并發(fā)策略類
因為Hystrix默認的并發(fā)策略不支持ThreadLocal傳遞,我們可以自定義并發(fā)策略類繼承HystrixConcurrencyStrategy
public class ThreadLocalAwareStrategy extends HystrixConcurrencyStrategy {
// 最簡單的方式就是引入現(xiàn)有的并發(fā)策略,進行功能擴展
private final HystrixConcurrencyStrategy existingConcurrencyStrategy;
public ThreadLocalAwareStrategy(
HystrixConcurrencyStrategy existingConcurrencyStrategy) {
this.existingConcurrencyStrategy = existingConcurrencyStrategy;
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
: super.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getRequestVariable(rv)
: super.getRequestVariable(rv);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue)
: super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy
.wrapCallable(new DelegatingWormholeContextCallable<>(callable, WormholeContextHolder.getContext()))
: super.wrapCallable(new DelegatingWormholeContextCallable<T>(callable, WormholeContextHolder.getContext()));
}
}
Hystrix注入新并發(fā)策略并進行刷新
public class HystrixThreadLocalConfiguration {
@Autowired(required = false)
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
@PostConstruct
public void init() {
// Keeps references of existing Hystrix plugins.
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance()
.getCommandExecutionHook();
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalAwareStrategy(existingConcurrencyStrategy));
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
}
}
6.2 數(shù)據(jù)隔離方案
6.2.1 JDBC數(shù)據(jù)源隔離

數(shù)據(jù)隔離需要對DB,Redis,RabbitMQ進行數(shù)據(jù)隔離
通過實現(xiàn)Spring動態(tài)數(shù)據(jù)源`AbstractRoutingDataSource`,通過`ThreadLocal`識別出來壓測數(shù)據(jù),如果是壓測數(shù)據(jù)就路由到影子庫,如果是正常流量則路由到主庫,通過流量識別的改造,各個服務(wù)都已經(jīng)能夠識別出壓測的請求流量了。
6.2.1.1 代碼實現(xiàn)
數(shù)據(jù)源路由Key持有對象
根據(jù)路由Key將選擇將操作路由給那個數(shù)據(jù)源
/**
* 動態(tài)數(shù)據(jù)源上下文
*/
public class DynamicDataSourceContextHolder {
public static final String PRIMARY_DB = "primary";
public static final String SHADOW_DB = "shadow";
private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>() {
/**
* 將 master 數(shù)據(jù)源的 key作為默認數(shù)據(jù)源的 key
*/
@Override
protected String initialValue() {
return PRIMARY_DB;
}
};
/**
* 數(shù)據(jù)源的 key集合,用于切換時判斷數(shù)據(jù)源是否存在
*/
public static List<Object> dataSourceKeys = new ArrayList<>();
/**
* 切換數(shù)據(jù)源
*
* @param key
*/
public static void setDataSourceKey(String key) {
contextHolder.set(key);
}
/**
* 獲取數(shù)據(jù)源
*
* @return
*/
public static String getDataSourceKey() {
return contextHolder.get();
}
/**
* 重置數(shù)據(jù)源
*/
public static void clearDataSourceKey() {
contextHolder.remove();
}
/**
* 判斷是否包含數(shù)據(jù)源
*
* @param key 數(shù)據(jù)源key
* @return
*/
public static boolean containDataSourceKey(String key) {
return dataSourceKeys.contains(key);
}
/**
* 添加數(shù)據(jù)源keys
*
* @param keys
* @return
*/
public static boolean addDataSourceKeys(Collection<? extends Object> keys) {
return dataSourceKeys.addAll(keys);
}
}
動態(tài)數(shù)據(jù)源實現(xiàn)類
根據(jù)路由Key實現(xiàn)數(shù)據(jù)源的切換
/**
* 動態(tài)數(shù)據(jù)源實現(xiàn)類
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
/**
* 如果不希望數(shù)據(jù)源在啟動配置時就加載好,可以定制這個方法,從任何你希望的地方讀取并返回數(shù)據(jù)源
* 比如從數(shù)據(jù)庫、文件、外部接口等讀取數(shù)據(jù)源信息,并最終返回一個DataSource實現(xiàn)類對象即可
*/
@Override
protected DataSource determineTargetDataSource() {
//獲取當前的上下文
WormholeContext wormholeContext = WormholeContextHolder.getContext();
//如果不為空使用影子庫
if (null != wormholeContext) {
DynamicDataSourceContextHolder.setDataSourceKey(DynamicDataSourceContextHolder.SHADOW_DB);
} else {
//為空則使用主數(shù)據(jù)源
DynamicDataSourceContextHolder.setDataSourceKey(DynamicDataSourceContextHolder.PRIMARY_DB);
}
return super.determineTargetDataSource();
}
/**
* 如果希望所有數(shù)據(jù)源在啟動配置時就加載好,這里通過設(shè)置數(shù)據(jù)源Key值來切換數(shù)據(jù),定制這個方法
*/
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceContextHolder.getDataSourceKey();
}
}
6.2.2 Redis 數(shù)據(jù)源隔離
同時通過`ThreadLocal`識別出來壓測數(shù)據(jù),自定義Redis的主鍵的序列化方式,如果是壓測數(shù)據(jù)則在主鍵后面加上后綴,這樣就可以通過不同主鍵將Redis數(shù)據(jù)進行隔離。
6.2.2.1 實現(xiàn)key序列化
public class KeyStringRedisSerializer extends StringRedisSerializer {
@Resource
private WormholeIsolationConfiguration isolationConfiguration;
public byte[] serialize(@Nullable String redisKey) {
WormholeContext wormholeContext = WormholeContextHolder.getContext();
if (null != wormholeContext) {
redisKey = isolationConfiguration.generateIsolationKey(redisKey);
}
return super.serialize(redisKey);
}
}
6.2.2.2 配置序列化器
/**
* Redis 配置類
*/
@Configuration
@ConditionalOnClass({RedisTemplate.class, RedisOperations.class, RedisConnectionFactory.class})
public class WormholeRedisAutoConfiguration {
@Bean
public KeyStringRedisSerializer keyStringRedisSerializer() {
return new KeyStringRedisSerializer();
}
@Bean("redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate template = new RedisTemplate();
//使用fastjson序列化
FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
// value值的序列化采用fastJsonRedisSerializer
template.setValueSerializer(fastJsonRedisSerializer);
template.setHashValueSerializer(fastJsonRedisSerializer);
// key的序列化采用StringRedisSerializer
template.setKeySerializer(keyStringRedisSerializer());
template.setHashKeySerializer(keyStringRedisSerializer());
template.setConnectionFactory(factory);
return template;
}
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setKeySerializer(keyStringRedisSerializer());
template.setHashKeySerializer(keyStringRedisSerializer());
template.setConnectionFactory(factory);
return template;
}
}
6.2.3 RabbitMQ 數(shù)據(jù)隔離

6.2.3.1 自動創(chuàng)建影子隊列
因為SpringAMQP中的
中的關(guān)鍵方法是私有的,無法通過繼承的方式進行實現(xiàn)對以配置好的隊列進行擴展,所以需要自定義該類,來實現(xiàn)對自動創(chuàng)建影子隊列,并和交換器進行綁定

代碼實現(xiàn)
改造`RabbitListenerAnnotationBeanPostProcessor`類來實現(xiàn)創(chuàng)建MQ影子隊列以及將影子Key綁定到影子隊列。
public class WormholeRabbitListenerAnnotationBeanPostProcessor extends RabbitListenerAnnotationBeanPostProcessor {
@Resource
private WormholeIsolationConfiguration wormholeIsolationConfiguration;
/**
* routingKey 前置處理器
*
* @param queueName
* @param routingKey
* @return
*/
@Override
public String preProcessingRoutingKey(String queueName, String routingKey) {
//如果是影子隊列就將routingKey轉(zhuǎn)換為 影子routingKey
if (wormholeIsolationConfiguration.checkIsolation(queueName) && !wormholeIsolationConfiguration.checkIsolation(routingKey)) {
return wormholeIsolationConfiguration.generateIsolationKey(routingKey);
}
return routingKey;
}
/**
* 處理隊列問題,如果來了一個隊列就生成一個shadow的隊列
*
* @param queues
* @return
*/
@Override
public List<String> handelQueues(List<String> queues) {
List<String> isolationQueues = new ArrayList<>();
if (null != queues && !queues.isEmpty()) {
for (String queue : queues) {
//添加shadow隊列
isolationQueues.add(wormholeIsolationConfiguration.generateIsolationKey(queue));
}
queues.addAll(isolationQueues);
}
return queues;
}
}
6.2.3.2 傳遞染色標識
因為MQ是異步通訊,為了傳遞染色標識,會在發(fā)送MQ的時候?qū)⑷旧珮俗R傳遞過來,MQ接收到之后放進當前線程的`ThreadLocal`里面,這個需要擴展Spring的`SimpleRabbitListenerContainerFactory`來實現(xiàn)

代碼實現(xiàn)
public class WormholeSimpleRabbitListenerContainerFactory extends SimpleRabbitListenerContainerFactory {
@Override
protected SimpleMessageListenerContainer createContainerInstance() {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAfterReceivePostProcessors(message -> {
//防止線程復(fù)用 銷毀ThreadLocal
WormholeContextHolder.invalidContext();
//獲取消息屬性標識
String wormholeRequestContext = message.getMessageProperties().getHeader(WormholeContextHolder.WORMHOLE_REQUEST_MARK);
if (StringUtils.isNotEmpty(wormholeRequestContext)) {
WormholeContextHolder.setContext(wormholeRequestContext);
}
return message;
});
return simpleMessageListenerContainer;
}
}
6.2.3.3 發(fā)送MQ消息處理
同上,需要傳遞染色標識,就通過繼承`RabbitTemplate`重寫`convertAndSend`方法來實現(xiàn)傳遞染色標識。
public class ShadowRabbitTemplate extends RabbitTemplate {
public ShadowRabbitTemplate(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
@Autowired
private WormholeIsolationConfiguration isolationConfiguration;
@Override
public void send(final String exchange, final String routingKey,
final Message message, @Nullable final CorrelationData correlationData)
throws AmqpException {
WormholeContext wormholeContext = WormholeContextHolder.getContext();
if (null == wormholeContext) {
super.send(exchange, routingKey, message, correlationData);
} else {
message.getMessageProperties().setHeader(WormholeContextHolder.WORMHOLE_REQUEST_MARK, wormholeContext.toString());
//生成Rabbit 隔離Key
String wormholeRoutingKey = isolationConfiguration.generateIsolationKey(routingKey);
//調(diào)用父類進行發(fā)送
super.send(exchange, wormholeRoutingKey, message, correlationData);
}
}
}
6.3 接口隔離方法
6.3.1 Mock 第三方接口
對于第三方數(shù)據(jù)接口需要進行隔離,比如短信接口,正常的數(shù)據(jù)需要發(fā)送短信,對于壓測數(shù)據(jù)則不能直接調(diào)用接口發(fā)送短信,并且需要能夠識別出來壓測數(shù)據(jù),并進行MOCK接口調(diào)用。

6.3.1.1 核心類實現(xiàn)
@Aspect
public class WormholeMockSection {
/**
* 切點 攔截@WormholeMock的注解
*/
@Pointcut("@annotation(com.heima.wormhole.component.mock.annotation.WormholeMock)")
public void pointCut() {
}
/**
* 環(huán)繞通知
*
* @param point
* @return
* @throws Throwable
*/
@Around("pointCut()")
public Object section(ProceedingJoinPoint point) throws Throwable {
WormholeContext wormholeContext = WormholeContextHolder.getContext();
Object[] parameter = point.getArgs();
//如果沒有wormholeContext 就執(zhí)行正常方法
if (null == wormholeContext) {
return point.proceed(parameter);
}
//如果存在就執(zhí)行MOCK方法
WormholeMock wormholeMock = WormholeMockUtils.getMethodAnnotation(point, WormholeMock.class);
if (null != wormholeMock) {
//獲取到 Mock 回調(diào)類
WormholeMockCallback wormholeMockCallback = WormholeMockUtils.getWormholeMockCallback(wormholeMock);
if (null != wormholeMockCallback) {
return wormholeMockCallback.handelMockData(parameter);
}
}
return null;
}
}
6.3.1.2 使用方式
在具體方法上面加上注解就可以使用了
@Override
//加入注解進行MOCK測試攔截 設(shè)置最大耗時
@WormholeMock(maxDelayTime = 10, minDelayTime = 2)
public boolean send(NotifyVO notifyVO) {
logger.info("開始發(fā)送短信通知.....");
try {
//模擬發(fā)送短信耗時
Thread.sleep(5);
} catch (InterruptedException e) {
}
return true;
}
6.4 零侵入方案
如果開發(fā)的中間件需要各個微服務(wù)大量改造,對開發(fā)人員來說就是一個災(zāi)難,所以這里采用零侵入的springboot starter 來解決
6.4.1 自動裝配
使用微服務(wù)得
@Conditional來完成配置得自動裝配,這里用MVC得配置來演示自動裝配,其他得都是類似這樣可以最大限度的優(yōu)化代碼并提高很高的可擴展性。
/**
* MVC 自動裝配
*/
@Configuration
//當DispatcherServlet存在時該配置類才會被執(zhí)行到
@ConditionalOnClass(org.springframework.web.servlet.DispatcherServlet.class)
public class WormholeMVCAutoConfiguration {
@ConditionalOnClass
@Bean
public WormholeMVCConfiguration wormholeMVCConfiguration() {
return new WormholeMVCConfiguration();
}
}
6.4.1.1 Conditional 簡介
@Conditional表示僅當所有指定條件都匹配時,組件才有資格注冊 。 該@Conditional注釋可以在以下任一方式使用:
- 作為任何@Bean方法的方法級注釋
- 作為任何類的直接或間接注釋的類型級別注釋 @Component,包括@Configuration類
- 作為元注釋,目的是組成自定義構(gòu)造型注釋
6.4.1.2 Conditional派生注解
@Conditional派生了很多注解,下面給個表格列舉一下派生注解的用法
| @Conditional派生注解 | 作用(都是判斷是否符合指定的條件) |
|---|---|
| @ConditionalOnJava | 系統(tǒng)的java版本是否符合要求 |
| @ConditionalOnBean | 有指定的Bean類 |
| @ConditionalOnMissingBean | 沒有指定的bean類 |
| @ConditionalOnExpression | 符合指定的SpEL表達式 |
| @ConditionalOnClass | 有指定的類 |
| @ConditionalOnMissingClass | 沒有指定的類 |
| @ConditionalOnSingleCandidate | 容器只有一個指定的bean,或者這個bean是首選bean |
| @ConditionalOnProperty | 指定的property屬性有指定的值 |
| @ConditionalOnResource | 路徑下存在指定的資源 |
| @ConditionalOnWebApplication | 系統(tǒng)環(huán)境是web環(huán)境 |
| @ConditionalOnNotWebApplication | 系統(tǒng)環(huán)境不是web環(huán)境 |
| @ConditionalOnjndi | JNDI存在指定的項 |
6.4.2 SpringBoot starter
和自動裝配一樣,Spring Boot Starter的目的也是簡化配置,而Spring Boot Starter解決的是依賴管理配置復(fù)雜的問題,有了它,當我需要構(gòu)建一個Web應(yīng)用程序時,不必再遍歷所有的依賴包,一個一個地添加到項目的依賴管理中,而是只需要一個配置`spring-boot-starter-web`。
6.4.2.1 使用規(guī)范
在 Spring Boot starter 開發(fā)規(guī)范中,項目中會有一個空的名為 **xxx-spring-boot-starter** 的項目,這個項目主要靠 pom.xml 將所有需要的依賴引入進來。同時項目還會有一個 **xxx-spring-boot-autoconfigure** 項目,這個項目主要寫帶 @Configuration 注解的配置類,在這個類或者類中帶 @Bean 的方法上。
6.4.2.2 項目使用
在 xxx-spring-boot-starter的項目下的resources文件夾下面新建一個META-INF文件,并在下面創(chuàng)建spring.factories文件,將我們的自動配置類配置進去
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.heima.wormhole.autoconfiguration.WormholeAutoConfiguration

6.5 服務(wù)監(jiān)控方案
6.5.1 skywalking簡介
Skywalking 是一個APM系統(tǒng),即應(yīng)用性能監(jiān)控系統(tǒng),為微服務(wù)架構(gòu)和云原生架構(gòu)系統(tǒng)設(shè)計。它通過探針自動收集所需的指標,并進行分布式追蹤。通過這些調(diào)用鏈路以及指標,Skywalking APM會感知應(yīng)用間關(guān)系和服務(wù)間關(guān)系,并進行相應(yīng)的指標統(tǒng)計。目前支持鏈路追蹤和監(jiān)控應(yīng)用組件如下,基本涵蓋主流框架和容器,如國產(chǎn)PRC Dubbo和motan等,國際化的spring boot,spring cloud都支持了
SkyWalking是分布式系統(tǒng)的應(yīng)用程序性能監(jiān)視工具,專為微服務(wù)、云原生架構(gòu)和基于容器(Docker、K8S、Mesos)架構(gòu)而設(shè)計
SkyWalking是觀察性分析平臺和應(yīng)用性能管理系統(tǒng)。提供分布式追蹤、服務(wù)網(wǎng)格遙測分析、度量聚合和可視化一體化解決方案
6.5.1.1 SkyWalking組件
Skywalking Agent: 采集
tracing(調(diào)用鏈數(shù)據(jù))和metric(指標)信息并上報,上報通過HTTP或者gRPC方式發(fā)送數(shù)據(jù)到Skywalking CollectorSkywalking Collector : 鏈路數(shù)據(jù)收集器,對agent傳過來的
tracing和metric數(shù)據(jù)進行整合分析通過Analysis Core模塊處理并落入相關(guān)的數(shù)據(jù)存儲中,同時會通過Query Core模塊進行二次統(tǒng)計和監(jiān)控告警Storage: Skywalking的存儲,支持以
ElasticSearch、Mysql、TiDB、H2等作為存儲介質(zhì)進行數(shù)據(jù)存儲UI: Web可視化平臺,用來展示落地的數(shù)據(jù),目前官方采納了RocketBot作為SkyWalking的主UI
6.5.2 配置SkyWalking
6.5.2.1 下載SkyWalking
下載SkyWalking的[壓縮包](https://www.apache.org/dyn/closer.cgi/skywalking/8.2.0/apache-skywalking-apm),解壓后將壓縮包里面的**agent**文件夾放進本地磁盤,探針包含整個目錄,請不要改變目錄結(jié)構(gòu)。
6.5.2.2 Agent配置
通過了解配置,可以對一個組件功能有一個大致的了解,解壓開skywalking的壓縮包,在agent/config文件夾中可以看到agent的配置文件,從skywalking支持環(huán)境變量配置加載,在啟動的時候優(yōu)先讀取環(huán)境變量中的相關(guān)配置。
| skywalking配置名稱 | 描述 |
|---|---|
| agent.namespace | 跨進程鏈路中的header,不同的namespace會導(dǎo)致跨進程的鏈路中斷 |
| agent.service_name | 一個服務(wù)(項目)的唯一標識,這個字段決定了在sw的UI上的關(guān)于service的展示名稱 |
| agent.sample_n_per_3_secs | 客戶端采樣率,0或者負數(shù)標識禁用,默認-1 |
| agent.authentication | 與collector進行通信的安全認證,需要同collector中配置相同 |
| agent.ignore_suffix | 忽略特定請求后綴的trace |
| collecttor.backend_service | agent需要同collector進行數(shù)據(jù)傳輸?shù)腎P和端口 |
| logging.level | agent記錄日志級別 |
skywalking agent使用javaagent無侵入式的配合collector實現(xiàn)對分布式系統(tǒng)的追蹤和相關(guān)數(shù)據(jù)的上下文傳遞。
6.5.2.3 配置探針
配置SpringBoot啟動參數(shù),需要填寫如下的運行參數(shù),代碼放在后面,需要的自己粘貼。
-javaagent:D:/data/skywalking/agent/skywalking-agent.jar
-Dskywalking.agent.service_name=storage-server
-Dskywalking.collector.backend_service=172.18.0.50:11800

- javaagent:復(fù)制的agent目錄下探針的jar包路徑
- skywalking.agent.service_name:需要在skywalking顯示的服務(wù)名稱
- skywalking.collector.backend_service:skywalking服務(wù)端地址默認是11800
本文由育博學(xué)谷狂野架構(gòu)師發(fā)布
如果本文對您有幫助,歡迎關(guān)注和點贊;如果您有任何建議也可留言評論或私信,您的支持是我堅持創(chuàng)作的動力
轉(zhuǎn)載請注明出處!