Java開源框架中的設(shè)計模式以及應(yīng)用場景

前言

設(shè)計模式是軟件設(shè)計中常見問題的典型解決方案,你可以通過對其進(jìn)行定制來解決代碼中的特定設(shè)計問題。

關(guān)于設(shè)計模式,網(wǎng)上有很多講解。但大部分都是Demo示例,看完有可能還是不知道怎么用。

本文筆者將從設(shè)計模式入手,看一看在優(yōu)秀的Java框架/中間件產(chǎn)品中,不同的設(shè)計模式應(yīng)用場景在哪里。

一、單例模式

單例模式是 Java 中最簡單的設(shè)計模式之一,它提供了一種創(chuàng)建對象的最佳方式。這種模式涉及到一個單一的類,該類負(fù)責(zé)創(chuàng)建自己的對象,同時確保只有單個對象被創(chuàng)建。這個類提供了一種訪問其唯一的對象的方式,可以直接訪問,不需要實(shí)例化該類的對象。

單例模式雖然很簡單,但它的花樣一點(diǎn)都不少,我們一一來看。

1、餓漢式

餓漢式,顧名思義,就是我很餓,迫不及待。不管有沒有人用,我先創(chuàng)建了再說。

比如在Dubbo中的這段代碼,創(chuàng)建一個配置管理器。

public class ConfigManager {
    private static final ConfigManager configManager = new ConfigManager(); 
    private ConfigManager() {}
    public static ConfigManager getInstance() {
        return configManager;
    }
}

又或者在RocketMQ中,創(chuàng)建一個MQ客戶端實(shí)例的時候。

public class MQClientManager {

    private static MQClientManager instance = new MQClientManager();
    private MQClientManager() {}
    public static MQClientManager getInstance() {
        return instance;
    }
}

2、懶漢式

懶漢式是對應(yīng)餓漢式而言的。它旨在第一次調(diào)用才初始化,避免內(nèi)存浪費(fèi)。但為了線程安全和性能,一般都會使用雙重檢查鎖的方式來創(chuàng)建。

我們來看Seata框架中,通過這種方式來創(chuàng)建一個配置類。

public class ConfigurationFactory{

    private static volatile Configuration CONFIG_INSTANCE = null;
    public static Configuration getInstance() {
        if (CONFIG_INSTANCE == null) {
            synchronized (Configuration.class) {
                if (CONFIG_INSTANCE == null) {
                    CONFIG_INSTANCE = buildConfiguration();
                }
            }
        }
        return CONFIG_INSTANCE;
    }
}

3、靜態(tài)內(nèi)部類

可以看到,通過雙重檢查鎖的方式來創(chuàng)建單例對象,還是比較復(fù)雜的。又是加鎖,又是判斷兩次,還需要加volatile修飾的。

使用靜態(tài)內(nèi)部類的方式,可以達(dá)到雙重檢查鎖相同的功效,但實(shí)現(xiàn)上簡單了。

在Seata框架中,創(chuàng)建RM事件處理程序器的時候,就使用了靜態(tài)內(nèi)部類的方式來創(chuàng)建單例對象。

public class DefaultRMHandler extends AbstractRMHandler{

    protected DefaultRMHandler() {
        initRMHandlers();
    }
    private static class SingletonHolder {
        private static AbstractRMHandler INSTANCE = new DefaultRMHandler();
    }
    public static AbstractRMHandler get() {
        return DefaultRMHandler.SingletonHolder.INSTANCE;
    }
}

還有可以通過枚舉的方式來創(chuàng)建單例對象,但這種方式并沒有被廣泛采用,至少筆者在常見的開源框架中沒見過,所以就不再列舉。

有人說,餓漢式的單例模式不好,不能做到延遲加載,浪費(fèi)內(nèi)存。但筆者認(rèn)為似乎過于吹毛求疵,事實(shí)上很多開源框架中,用的最多的就是這種方式。

如果明確希望實(shí)現(xiàn)懶加載效果時,可以考慮用靜態(tài)內(nèi)部類的方式;如果還有其他特殊的需求,比如創(chuàng)建對象的過程比較繁瑣,可以用雙重檢查鎖的方式。

二、工廠模式

工廠模式是 Java 中最常用的設(shè)計模式之一。這種類型的設(shè)計模式屬于創(chuàng)建型模式,它提供了一種創(chuàng)建對象的最佳方式。

簡單來說,在工廠模式中,就是代替new實(shí)例化具體類的一種模式。

1、簡單工廠

簡單工廠,的確比較簡單,它的作用就是把對象的創(chuàng)建放到一個工廠類中,通過參數(shù)來創(chuàng)建不同的對象。

在分布式事務(wù)框架Seata中,如果發(fā)生異常,則需要進(jìn)行二階段回滾。

它的過程是,通過事務(wù)id找到undoLog記錄,然后解析里面的數(shù)據(jù)生成SQL,將一階段執(zhí)行的SQL給撤銷掉。

問題是SQL的種類包含了比如INSERT、UPDATE、DELETE,所以它們反解析的過程也不一樣,就需要不同的執(zhí)行器去解析。

在Seata中,有一個抽象的撤銷執(zhí)行器,可以生成一條SQL。

public abstract class AbstractUndoExecutor{
    //生成撤銷SQL
    protected abstract String buildUndoSQL();
}

然后有一個獲取撤銷執(zhí)行器的工廠,根據(jù)SQL的類型,創(chuàng)建不同類型的執(zhí)行器并返回。

public class UndoExecutorFactory {

    public static AbstractUndoExecutor getUndoExecutor(String dbType, SQLUndoLog sqlUndoLog) {
        switch (sqlUndoLog.getSqlType()) {
            case INSERT:
                return new MySQLUndoInsertExecutor(sqlUndoLog);
            case UPDATE:
                return new MySQLUndoUpdateExecutor(sqlUndoLog);
            case DELETE:
                return new MySQLUndoDeleteExecutor(sqlUndoLog);
            default:
                throw new ShouldNeverHappenException();
        }
    }
}   

使用的時候,直接通過工廠類獲取執(zhí)行器。

AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(),sqlUndoLog);
undoExecutor.executeOn(conn);

簡單工廠模式的優(yōu)點(diǎn),想必各位都能領(lǐng)會,我們不再贅述。但它還有個小小的缺點(diǎn):

一旦有了新的實(shí)現(xiàn)類,就需要修改工廠實(shí)現(xiàn),有可能造成工廠邏輯過于復(fù)雜,不利于系統(tǒng)的擴(kuò)展和維護(hù)。

2、工廠方法

工廠方法模式解決了上面那個問題。它可以創(chuàng)建一個工廠接口和多個工廠實(shí)現(xiàn)類,這樣如果增加新的功能,只需要添加新的工廠類就可以,不需要修改之前的代碼。

另外,工廠方法模式還可以和模板方法模式結(jié)合一起,將他們共同的基礎(chǔ)邏輯抽取到父類中,其它的交給子類去實(shí)現(xiàn)。

在Dubbo中,有一個關(guān)于緩存的設(shè)計完美的體現(xiàn)了工廠方法模式+模板方法模式。

首先,有一個緩存的接口,它提供了設(shè)置緩存和獲取緩存兩個方法。

public interface Cache {
    void put(Object key, Object value);
    Object get(Object key);
}

然后呢,還有一個緩存工廠,它返回一個緩存的實(shí)現(xiàn)。

public interface CacheFactory {
    Cache getCache(URL url, Invocation invocation);
}

由于結(jié)合了模板方法模式,所以Dubbo又搞了個抽象的緩存工廠類,它實(shí)現(xiàn)了緩存工廠的接口。

public abstract class AbstractCacheFactory implements CacheFactory {
    
    //具體的緩存實(shí)現(xiàn)類
    private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>();
    
    @Override
    public Cache getCache(URL url, Invocation invocation) {
        url = url.addParameter(Constants.METHOD_KEY, invocation.getMethodName());
        String key = url.toFullString();
        Cache cache = caches.get(key);
        if (cache == null) {
            //創(chuàng)建緩存實(shí)現(xiàn)類,交給子類實(shí)現(xiàn)
            caches.put(key, createCache(url));
            cache = caches.get(key);
        }
        return cache;
    }
    //抽象方法,交給子類實(shí)現(xiàn)
    protected abstract Cache createCache(URL url);
}

在這里,公共的邏輯就是通過getCahce()創(chuàng)建緩存實(shí)現(xiàn)類,那具體創(chuàng)建什么樣的緩存實(shí)現(xiàn)類,就由子類去決定。

所以,每個子類都是一個個具體的緩存工廠類,比如包括:

ExpiringCacheFactory、JCacheFactory、LruCacheFactory、ThreadLocalCacheFactory。

這些工廠類,只有一個方法,就是創(chuàng)建具體的緩存實(shí)現(xiàn)類。

public class ThreadLocalCacheFactory extends AbstractCacheFactory {
    @Override
    protected Cache createCache(URL url) {
        return new ThreadLocalCache(url);
    }
}

這里的ThreadLocalCache就是具體的緩存實(shí)現(xiàn)類,比如它是通過ThreadLocal來實(shí)現(xiàn)緩存功能。

public class ThreadLocalCache implements Cache {

    private final ThreadLocal<Map<Object, Object>> store;

    public ThreadLocalCache(URL url) {
        this.store = new ThreadLocal<Map<Object, Object>>() {
            @Override
            protected Map<Object, Object> initialValue() {
                return new HashMap<Object, Object>();
            }
        };
    }
    @Override
    public void put(Object key, Object value) {
        store.get().put(key, value);
    }
    @Override
    public Object get(Object key) {
        return store.get().get(key);
    }
}

那在客戶端使用的時候,還是通過工廠來獲取緩存對象。

public static void main(String[] args) {
    URL url = URL.valueOf("http://localhost:8080/cache=jacache&.cache.write.expire=1");
    Invocation invocation = new RpcInvocation();
    CacheFactory cacheFactory = new ThreadLocalCacheFactory();
    Cache cache = cacheFactory.getCache(url, invocation);
    cache.put("java","java");
    System.out.println(cache.get("java"));
}

這樣做的好處有兩點(diǎn)。

第一,如果增加新的緩存實(shí)現(xiàn),只要添加一個新的緩存工廠類就可以,別的都無需改動。

第二,通過模板方法模式,封裝不變部分,擴(kuò)展可變部分。 提取公共代碼,便于維護(hù)。

另外,在Dubbo中,注冊中心的獲取也是通過工廠方法來實(shí)現(xiàn)的。

3、抽象工廠

抽象工廠模式,它能創(chuàng)建一系列相關(guān)的對象, 而無需指定其具體類。

工廠方法模式和抽象工廠模式,它們之間最大的區(qū)別在于:

  • 工廠方法模式只有一個抽象產(chǎn)品類,具體工廠類只能創(chuàng)建一個具體產(chǎn)品類的實(shí)例;
  • 抽象工廠模式有多個抽象產(chǎn)品類,具體工廠類可以創(chuàng)建多個具體產(chǎn)品類的實(shí)例。

我們拿上面緩存的例子來繼續(xù)往下說。

如果我們現(xiàn)在有一個數(shù)據(jù)訪問程序,需要同時操作緩存和數(shù)據(jù)庫,那就需要多個抽象產(chǎn)品和多個具體產(chǎn)品實(shí)現(xiàn)。

緩存相關(guān)的產(chǎn)品類都已經(jīng)有了,我們接著來創(chuàng)建數(shù)據(jù)庫相關(guān)的產(chǎn)品實(shí)現(xiàn)。

首先,有一個數(shù)據(jù)庫接口,它是抽象產(chǎn)品類。

public interface DataBase {
    void insert(Object tableName, Object record);
    Object select(Object tableName);
}

然后,我們創(chuàng)建兩個具體產(chǎn)品類MysqlDataBase和OracleDataBase。

public class MysqlDataBase implements DataBase{
    Map<Object,Object> mysqlDb = new HashMap<>();
    @Override
    public void insert(Object tableName, Object record) {
        mysqlDb.put(tableName,record);
    }
    @Override
    public Object select(Object tableName) {
        return mysqlDb.get(tableName);
    }
}

public class OracleDataBase implements DataBase {
    Map<Object,Object> oracleDb = new HashMap<>();
    @Override
    public void insert(Object tableName, Object record) {
        oracleDb.put(tableName,record);
    }
    @Override
    public Object select(Object tableName) {
        return oracleDb.get(tableName);
    }
}

其次,創(chuàng)建抽象的工廠類,它可以返回一個緩存對象和數(shù)據(jù)庫對象。

public interface DataAccessFactory {
    Cache getCache(URL url);
    DataBase getDb();
}

最后是具體的工廠類,可以根據(jù)實(shí)際的需求,任意組合每一個具體的產(chǎn)品。

比如我們需要一個基于ThreadLocal的緩存實(shí)現(xiàn)和基于Mysql的數(shù)據(jù)庫對象。

public class DataAccessFactory1 implements DataAccessFactory {
    @Override
    public Cache getCache(URL url) {
        return new ThreadLocalCache(url);
    }
    @Override
    public DataBase getDb() {
        return new MysqlDataBase();
    }
}

如果需要一個基于Lru的緩存實(shí)現(xiàn)和基于Oracle的數(shù)據(jù)庫對象。


public class DataAccessFactory2 implements DataAccessFactory {
    @Override
    public Cache getCache(URL url) {
        return new LruCache(url);
    }
    @Override
    public DataBase getDb() {
        return new OracleDataBase();
    }
}

可以看到,抽象工廠模式隔離了具體類的生成,使得客戶并不需要知道什么被創(chuàng)建。由于這種隔離,更換一個具體工廠就變得相對容易,所有的具體工廠都實(shí)現(xiàn)了抽象工廠中定義的那些公共接口,因此只需改變具體工廠的實(shí)例,就可以在某種程度上改變整個軟件系統(tǒng)的行為。

三、模板方法模式

在模板模式中,一個抽象類公開定義了執(zhí)行它的方法的方式/模板。它的子類可以按需要重寫方法實(shí)現(xiàn),但調(diào)用將以抽象類中定義的方式進(jìn)行。

簡單來說,有多個子類共有的方法,且邏輯相同,可以考慮作為模板方法。

在上面Dubbo緩存的例子中,我們已經(jīng)看到了模板方法模式的應(yīng)用。但那個是和工廠方法模式結(jié)合在一塊的,我們再單獨(dú)找找模板方法模式的應(yīng)用。

我們知道,當(dāng)我們的Dubbo應(yīng)用出現(xiàn)多個服務(wù)提供者時,服務(wù)消費(fèi)者需要通過負(fù)載均衡算法,選擇其中一個服務(wù)來進(jìn)行調(diào)用。

首先,有一個LoadBalance接口,返回一個Invoker。

public interface LoadBalance {
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}

然后定義一個抽象類,AbstractLoadBalance,實(shí)現(xiàn)LoadBalance接口。

public abstract class AbstractLoadBalance implements LoadBalance {

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers == null || invokers.isEmpty()) {
            return null;
        }
        if (invokers.size() == 1) {
            return invokers.get(0);
        }
        return doSelect(invokers, url, invocation);
    }
    //抽象方法,由子類選擇一個Invoker
    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
}

這里的公共邏輯就是兩個判斷,判斷invokers集合是否為空或者是否只有一個實(shí)例。如果是的話,就無需調(diào)用子類,直接返回就好了。

具體的負(fù)載均衡實(shí)現(xiàn)有四個:

  • 基于權(quán)重隨機(jī)算法的 RandomLoadBalance
  • 基于最少活躍調(diào)用數(shù)算法的 LeastActiveLoadBalance
  • 基于 hash 一致性的 ConsistentHashLoadBalance
  • 基于加權(quán)輪詢算法的 RoundRobinLoadBalance
public class RandomLoadBalance extends AbstractLoadBalance {
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        //省略邏輯....
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }
}

它們根據(jù)不同的算法實(shí)現(xiàn),來返回一個具體的Invoker對象。

四、構(gòu)造器模式

構(gòu)造器模式使用多個簡單的對象一步一步構(gòu)建成一個復(fù)雜的對象。這種類型的設(shè)計模式屬于創(chuàng)建型模式,它提供了一種創(chuàng)建對象的最佳方式。

這種模式,常見于在構(gòu)建一個復(fù)雜的對象,里面可能包含一些業(yè)務(wù)邏輯,比如檢查,屬性轉(zhuǎn)換等。如果都在客戶端手動去設(shè)置,那么會產(chǎn)生大量的冗余代碼。那么這時候,我們就可以考慮使用構(gòu)造器模式。

比如,在Mybatis中,MappedStatement的創(chuàng)建過程就使用了構(gòu)造器模式。

我們知道,XML文件中的每一個SQL標(biāo)簽就要生成一個MappedStatement對象,它里面包含很多個屬性,我們要構(gòu)造的對象也是它。

public final class MappedStatement {
    private String resource;
    private Configuration configuration;
    private String id;
    private SqlSource sqlSource;
    private ParameterMap parameterMap;
    private List<ResultMap> resultMaps;
    //.....省略大部分屬性
}

然后有一個內(nèi)部類Builder,它負(fù)責(zé)完成MappedStatement對象的構(gòu)造。

首先,這個Builder類,通過默認(rèn)的構(gòu)造函數(shù),先完成對MappedStatement對象,部分的構(gòu)造。

public static class Builder {

    private MappedStatement mappedStatement = new MappedStatement();
    
    public Builder(Configuration configuration, String id, SqlSource sqlSource, SqlCommandType sqlCommandType) {
        mappedStatement.configuration = configuration;
        mappedStatement.id = id;
        mappedStatement.sqlSource = sqlSource;
        mappedStatement.statementType = StatementType.PREPARED;
        mappedStatement.resultSetType = ResultSetType.DEFAULT;
        //.....省略大部分過程
    }
}

然后,通過一系列方法,可以設(shè)置特定的屬性,并返回這個Builder類,這里的方法適合處理一些業(yè)務(wù)邏輯。

public static class Builder {

    public Builder parameterMap(ParameterMap parameterMap) {
      mappedStatement.parameterMap = parameterMap;
      return this;
    }
    
    public Builder resultMaps(List<ResultMap> resultMaps) {
      mappedStatement.resultMaps = resultMaps;
      for (ResultMap resultMap : resultMaps) {
        mappedStatement.hasNestedResultMaps = mappedStatement.hasNestedResultMaps || resultMap.hasNestedResultMaps();
      }
      return this;
    }
    
    public Builder statementType(StatementType statementType) {
      mappedStatement.statementType = statementType;
      return this;
    }

    public Builder resultSetType(ResultSetType resultSetType) {
      mappedStatement.resultSetType = resultSetType == null ? ResultSetType.DEFAULT : resultSetType;
      return this;
    }
}

最后呢,就是提供一個build方法,返回構(gòu)建完成的對象就好了。

public MappedStatement build() {
    assert mappedStatement.configuration != null;
    assert mappedStatement.id != null;
    assert mappedStatement.sqlSource != null;
    assert mappedStatement.lang != null;
    mappedStatement.resultMaps = Collections.unmodifiableList(mappedStatement.resultMaps);
    return mappedStatement;
}

在客戶端使用的時候,先創(chuàng)建一個 Builder,然后鏈?zhǔn)降恼{(diào)用一堆方法,最后再調(diào)用一次 build() 方法,我們需要的對象就有了,這就是構(gòu)造器模式的應(yīng)用。

MappedStatement.Builder statementBuilder = new MappedStatement.Builder(configuration, id, sqlSource, sqlCommandType)
    .resource(resource)
    .fetchSize(fetchSize)
    .timeout(timeout)
    .statementType(statementType)
    .keyGenerator(keyGenerator)
    .keyProperty(keyProperty)
    .keyColumn(keyColumn)
    .databaseId(databaseId)
    .lang(lang)
    .resultOrdered(resultOrdered)
    .resultSets(resultSets)
    .resultMaps(getStatementResultMaps(resultMap, resultType, id))
    .resultSetType(resultSetType)
    .flushCacheRequired(valueOrDefault(flushCache, !isSelect))
    .useCache(valueOrDefault(useCache, isSelect))
    .cache(currentCache);

ParameterMap statementParameterMap = getStatementParameterMap(parameterMap, parameterType, id);
MappedStatement statement = statementBuilder.build();
configuration.addMappedStatement(statement);
return statement;

五、適配器模式

適配器模式是作為兩個不兼容的接口之間的橋梁。這種類型的設(shè)計模式屬于結(jié)構(gòu)型模式,它結(jié)合了兩個獨(dú)立接口的功能。

適配器模式一般用于屏蔽業(yè)務(wù)邏輯與第三方服務(wù)的交互,或者是新老接口之間的差異。

我們知道,在Dubbo中,所有的數(shù)據(jù)都是通過Netty來負(fù)責(zé)傳輸?shù)?,然后這就涉及了消息編解碼的問題。

所以,首先它有一個編解碼器的接口,負(fù)責(zé)編碼和解碼。

@SPI
public interface Codec2 {

    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
    
    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
    
    enum DecodeResult {
        NEED_MORE_INPUT, SKIP_SOME_INPUT
    }
}

然后,有幾個實(shí)現(xiàn)類,比如DubboCountCodec、DubboCodec、ExchangeCodec等。

但是,當(dāng)我們打開這些類的時候,就會發(fā)現(xiàn),他們都是Dubbo中普通的類,只是實(shí)現(xiàn)了Codec2接口,其實(shí)不能直接作用于Netty編解碼。

這是因?yàn)?,Netty編解碼需要實(shí)現(xiàn)ChannelHandler接口,這樣才會被聲明成Netty的處理組件。比如像MessageToByteEncoder、ByteToMessageDecoder那樣。

鑒于此,Dubbo搞了一個適配器,專門來適配編解碼器接口。

final public class NettyCodecAdapter {

    private final ChannelHandler encoder = new InternalEncoder();
    private final ChannelHandler decoder = new InternalDecoder();
    private final Codec2 codec;
    private final URL url;
    private final org.apache.dubbo.remoting.ChannelHandler handler;
    
    public NettyCodecAdapter(Codec2 codec, URL url, org.apache.dubbo.remoting.ChannelHandler handler) {
        this.codec = codec;
        this.url = url;
        this.handler = handler;
    }
    public ChannelHandler getEncoder() {
        return encoder;
    }
    public ChannelHandler getDecoder() {
        return decoder;
    }
    
    private class InternalEncoder extends MessageToByteEncoder {
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            org.apache.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
            Channel ch = ctx.channel();
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            codec.encode(channel, buffer, msg);
        }
    }
    private class InternalDecoder extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
            ChannelBuffer message = new NettyBackedChannelBuffer(input);
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            //解碼對象
            codec.decode(channel, message);
            //省略部分代碼...
        }
    }
}

上面的代碼中,我們看到,NettyCodecAdapter類適配的是Codec2接口,通過構(gòu)造函數(shù)傳遞實(shí)現(xiàn)類,然后定義了內(nèi)部的編碼器實(shí)現(xiàn)和解碼器實(shí)現(xiàn),同時它們都是ChannelHandler。

這樣的話,在內(nèi)部類里面的編碼和解碼邏輯,真正調(diào)用的還是Codec2接口。

最后我們再來看看,該適配器的調(diào)用方式。

//通過SPI方式獲取編解碼器的實(shí)現(xiàn)類,比如這里是DubboCountCodec
Codec2 codec = ExtensionLoader.getExtensionLoader(Codec2.class).getExtension("dubbo");
URL url = new URL("dubbo", "localhost", 22226);
//創(chuàng)建適配器
NettyCodecAdapter adapter = new NettyCodecAdapter(codec, url, NettyClient.this);
//向ChannelPipeline中添加編解碼處理器
ch.pipeline()
    .addLast("decoder", adapter.getDecoder())
    .addLast("encoder", adapter.getEncoder())

以上,就是Dubbo中關(guān)于編解碼器對于適配器模式的應(yīng)用。

六、責(zé)任鏈模式

責(zé)任鏈模式為請求創(chuàng)建了一個接收者對象的鏈。允許你將請求沿著處理者鏈進(jìn)行發(fā)送。收到請求后,每個處理者均可對請求進(jìn)行處理, 或?qū)⑵鋫鬟f給鏈上的下個處理者。

我們來看一個Netty中的例子。我們知道,在Netty中服務(wù)端處理消息,就要添加一個或多個ChannelHandler。那么,承載這些ChannelHandler的就是ChannelPipeline,它的實(shí)現(xiàn)過程就體現(xiàn)了責(zé)任鏈模式的應(yīng)用。

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    protected void initChannel(NioSocketChannel channel) {
        channel.pipeline()
            .addLast(new ChannelHandler1())
            .addLast(new ChannelHandler2())
            .addLast(new ChannelHandler3());
    }
});

需要知道的是,在 Netty 整個框架里面,一條連接對應(yīng)著一個 Channel,每一個新創(chuàng)建的 Channel 都將會被分配一個新的 ChannelPipeline。

ChannelPipeline里面保存的是ChannelHandlerContext對象,它是Channel相關(guān)的上下文對象,里面包著我們定義的處理器ChannelHandler。

根據(jù)事件的起源,IO事件將會被 ChannelInboundHandler或者ChannelOutboundHandler處理。隨后,通過調(diào)用ChannelHandlerContext 實(shí)現(xiàn),它將被轉(zhuǎn)發(fā)給同一超類型的下一個ChannelHandler。

1、ChannelHandler

首先,我們來看責(zé)任處理器接口,Netty中的ChannelHandler,它充當(dāng)了所有處理入站和出站數(shù)據(jù)的應(yīng)用程序邏輯的容器。

public interface ChannelHandler {
    //當(dāng)把 ChannelHandler 添加到 ChannelPipeline 中時被調(diào)用
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    //當(dāng)從 ChannelPipeline 中移除 ChannelHandler 時被調(diào)用
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    //當(dāng)處理過程中在 ChannelPipeline 中有錯誤產(chǎn)生時被調(diào)用
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

然后 Netty 定義了下面兩個重要的 ChannelHandler 子接口:

  • ChannelInboundHandler,處理入站數(shù)據(jù)以及各種狀態(tài)變化;
public interface ChannelInboundHandler extends ChannelHandler {
    //當(dāng) Channel 已經(jīng)注冊到它的 EventLoop 并且能夠處理 I/O 時被調(diào)用
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    //當(dāng) Channel 從它的 EventLoop 注銷并且無法處理任何 I/O 時被調(diào)用
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception; 
    //當(dāng) Channel 處于活動狀態(tài)時被調(diào)用;Channel 已經(jīng)連接/綁定并且已經(jīng)就緒
    void channelActive(ChannelHandlerContext ctx) throws Exception;   
    //當(dāng) Channel 離開活動狀態(tài)并且不再連接它的遠(yuǎn)程節(jié)點(diǎn)時被調(diào)用
    void channelInactive(ChannelHandlerContext ctx) throws Exception;  
    當(dāng)從 Channel 讀取數(shù)據(jù)時被調(diào)用
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;   
    //當(dāng) Channel上的一個讀操作完成時被調(diào)用
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; 
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
  • ChannelOutboundHandler,處理出站數(shù)據(jù)并且允許攔截所有的操作;
public interface ChannelOutboundHandler extends ChannelHandler {
    
    //當(dāng)請求將 Channel 綁定到本地地址時被調(diào)用
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
    //當(dāng)請求將 Channel 連接到遠(yuǎn)程節(jié)點(diǎn)時被調(diào)用
    void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,SocketAddress localAddress, 
        ChannelPromise promise) throws Exception;
    //當(dāng)請求將 Channel 從遠(yuǎn)程節(jié)點(diǎn)斷開時被調(diào)用
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    //當(dāng)請求關(guān)閉 Channel 時被調(diào)用
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    //當(dāng)請求將 Channel 從它的 EventLoop 注銷時被調(diào)用
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    //當(dāng)請求從 Channel 讀取更多的數(shù)據(jù)時被調(diào)用
    void read(ChannelHandlerContext ctx) throws Exception;
    //當(dāng)請求通過 Channel 將數(shù)據(jù)寫到遠(yuǎn)程節(jié)點(diǎn)時被調(diào)用
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
    //當(dāng)請求通過 Channel 將入隊數(shù)據(jù)沖刷到遠(yuǎn)程節(jié)點(diǎn)時被調(diào)用
    void flush(ChannelHandlerContext ctx) throws Exception;
}

2、ChannelPipeline

既然叫做責(zé)任鏈模式,那就需要有一個“鏈”,在Netty中就是ChannelPipeline。

ChannelPipeline 提供了 ChannelHandler 鏈的容器,并定義了用于在該鏈上傳播入站和出站事件流的方法,另外它還具有添加刪除責(zé)任處理器接口的功能。

public interface ChannelPipeline{
    ChannelPipeline addFirst(String name, ChannelHandler handler);
    ChannelPipeline addLast(String name, ChannelHandler handler);
    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
    ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
    ChannelPipeline remove(ChannelHandler handler);
    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
    @Override
    ChannelPipeline fireChannelRegistered();
    @Override
    ChannelPipeline fireChannelActive();
    @Override
    ChannelPipeline fireExceptionCaught(Throwable cause);
    @Override
    ChannelPipeline fireUserEventTriggered(Object event);
    @Override
    ChannelPipeline fireChannelRead(Object msg);
    @Override
    ChannelPipeline flush();
    //省略部分方法.....
}

然后我們看它的實(shí)現(xiàn),默認(rèn)有兩個節(jié)點(diǎn),頭結(jié)點(diǎn)和尾結(jié)點(diǎn)。并在構(gòu)造函數(shù)中,使它們首尾相連。這就是標(biāo)準(zhǔn)的鏈?zhǔn)浇Y(jié)構(gòu)。

public class DefaultChannelPipeline implements ChannelPipeline {

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    private final Channel channel;
    
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
}

當(dāng)有新的ChannelHandler被添加時,則將其封裝為ChannelHandlerContext對象,然后插入到鏈表中。

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

3、ChannelHandlerContext

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關(guān)聯(lián),每當(dāng)有 ChannelHandler 添加到 ChannelPipeline 中時,都會創(chuàng)建 ChannelHandlerContext。

ChannelHandlerContext 的主要功能是管理它所關(guān)聯(lián)的 ChannelHandler 和在同一個 ChannelPipeline 中的其他 ChannelHandler 之間的交互。

public interface ChannelHandlerContext{
    Channel channel();
    EventExecutor executor();
    ChannelHandler handler();
    ChannelPipeline pipeline();
    @Override
    ChannelHandlerContext fireChannelRegistered();
    @Override
    ChannelHandlerContext fireChannelUnregistered();
    @Override
    ChannelHandlerContext fireChannelActive();
    @Override
    ChannelHandlerContext fireChannelRead(Object msg);
    @Override
    ChannelHandlerContext read();
    @Override
    ChannelHandlerContext flush();
    //省略部分方法...
}

ChannelHandlerContext負(fù)責(zé)在鏈上傳播責(zé)任處理器接口的事件。

它有兩個重要的方法,查找Inbound類型和Outbound類型的處理器。

值得注意的是,如果一個入站事件被觸發(fā),它將被從ChannelPipeline的頭部開始一直被傳播到ChannelPipeline的尾端;一個出站事件將從ChannelPipeline的最右邊開始,然后向左傳播。

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    
    //查找下一個Inbound類型的處理器,左 > 右
    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.next;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
        return ctx;
    }
    //查找下一個Outbound類型的處理器,右 > 左
    private AbstractChannelHandlerContext findContextOutbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.prev;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
        return ctx;
    }
}

4、處理流程

當(dāng)我們向服務(wù)端發(fā)送消息的時候,將會觸發(fā)read方法。

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    public final void read() {  
        //從Channel中獲取對應(yīng)的ChannelPipeline
        final ChannelPipeline pipeline = pipeline();
        //數(shù)據(jù)載體
        ByteBuf byteBuf = allocHandle.allocate(allocator);
        //傳遞數(shù)據(jù)
        pipeline.fireChannelRead(byteBuf);
    }
}

上面的代碼中,就會調(diào)用到ChannelPipeline,它會從Head節(jié)點(diǎn)開始,根據(jù)上下文對象依次調(diào)用處理器。

public class DefaultChannelPipeline implements ChannelPipeline {
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
}

因?yàn)榈谝粋€節(jié)點(diǎn)是默認(rèn)的頭結(jié)點(diǎn)HeadContext,所以它是從ChannelHandlerContext開始的。

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    //找到下一個ChannelHandler并執(zhí)行
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }
    
}

然后在我們自定義的ChannelHandler中,就會被調(diào)用到。

public class ChannelHandler1 extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg){
        System.out.println("ChannelHandler1:"+msg);
        ctx.fireChannelRead(msg);
    }
}

如果消息有多個ChannelHandler,你可以自由選擇是否繼續(xù)往下傳遞請求。

比如,如果你認(rèn)為消息已經(jīng)被處理且不應(yīng)該繼續(xù)往下調(diào)用,把上面的ctx.fireChannelRead(msg);注釋掉就終止了整個責(zé)任鏈。

七、策略模式

該模式定義了一系列算法,并將每個算法封裝起來,使它們可以相互替換,且算法的變化不會影響使用算法的客戶。

策略模式是一個很常見,而且也很好用的設(shè)計模式,如果你的業(yè)務(wù)代碼中有大量的if...else,那么就可以考慮是否可以使用策略模式改造一下。

RocketMQ我們大家都熟悉,是一款優(yōu)秀的分布式消息中間件。消息中間件,簡單來說,就是客戶端發(fā)送一條消息,服務(wù)端存儲起來并提供給消費(fèi)者去消費(fèi)。

請求消息的類型多種多樣,處理過程肯定也不一樣,每次都判斷一下再處理就落了下乘。在RocketMQ里,它會把所有處理器注冊起來,然后根據(jù)請求消息的 code ,讓對應(yīng)的處理器處理請求,這就是策略模式的應(yīng)用。

首先,它們需要實(shí)現(xiàn)同一個接口,在這里就是請求處理器接口。

public interface NettyRequestProcessor {
    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;
    boolean rejectRequest();
}

這個接口只做一件事,就是處理來自客戶端的請求。不同類型的請求封裝成不同的RemotingCommand對象。

RocketMQ大概有90多種請求類型,都在RequestCode里以 code 來區(qū)分。

然后,定義一系列策略類。我們來看幾個。

//默認(rèn)的消息處理器
public class DefaultRequestProcessor implements NettyRequestProcessor {}
//發(fā)送消息的處理器
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {}
//拉取消息的處理器
public class PullMessageProcessor implements NettyRequestProcessor {}
//查詢消息的處理器
public class QueryMessageProcessor implements NettyRequestProcessor {}
//消費(fèi)者端管理的處理器
public class ConsumerManageProcessor implements NettyRequestProcessor {}

接著,將這些策略類封裝起來。在RocketMQ中,在啟動Broker服務(wù)器的時候,注冊這些處理器。

public class BrokerController {

    public void registerProcessor() {
    
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE,this.pullMessageProcessor,this.pullMessageExecutor);
        
        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
        
        NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
        
        ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
        //省略部分注冊過程.....
    }
}

最后,在Netty接收到客戶端的請求之后,就會根據(jù)消息的類型,找到對應(yīng)的策略類,去處理消息。

public abstract class NettyRemotingAbstract {

    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        //根據(jù)請求類型找到對應(yīng)的策略類
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        //如果沒有找到就使用默認(rèn)的
        final Pair<NettyRequestProcessor, ExecutorService> pair = 
                    null == matched ? this.defaultRequestProcessor : matched;
        //執(zhí)行策略
        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
        //省略大部分代碼......
    }
}

如果有了新的請求消息類型,RocketMQ也無需修改業(yè)務(wù)代碼,新增策略類并將其注冊進(jìn)來就好了。

八、代理模式

代理模式,為其他對象提供一種代理以控制對這個對象的訪問。

在一些開源框架或中間件產(chǎn)品中,代理模式會非常常見。我們使用的時候越簡便,框架在背后幫我們做的事就可能越復(fù)雜。這里面往往都體現(xiàn)著代理模式的應(yīng)用,頗有移花接木的味道。

1、Dubbo

Dubbo作為一個RPC框架,其中有一個很重要的功能就是:

提供高性能的基于代理的遠(yuǎn)程調(diào)用能力,服務(wù)以接口為粒度,為開發(fā)者屏蔽遠(yuǎn)程調(diào)用底層細(xì)節(jié)。

這里我們關(guān)注兩個重點(diǎn):

  • 面向接口代理;
  • 屏蔽調(diào)用底層細(xì)節(jié)。

比如我們有一個庫存服務(wù),它提供一個扣減庫存的接口。

public interface StorageDubboService {
    int decreaseStorage(StorageDTO storage);
}

在別的服務(wù)里,需要扣減庫存的時候,就會通過Dubbo引用這個接口,也比較簡單。

@Reference
StorageDubboService storageDubboService;

我們使用起來很簡單,可 StorageDubboService 只是一個普通的服務(wù)類,并不具備遠(yuǎn)程調(diào)用的能力。

Dubbo就是給這些服務(wù)類,創(chuàng)建了代理類。通過ReferenceBean來創(chuàng)建并返回一個代理對象。

public class ReferenceBean<T>{
    @Override
    public Object getObject() {
        return get();
    }
    public synchronized T get() {
        if (ref == null) {
            init();
        }
        return ref;
    }
}

在我們使用的時候,實(shí)則調(diào)用的是代理對象,代理對象完成復(fù)雜的遠(yuǎn)程調(diào)用。比如連接注冊中心、負(fù)載均衡、集群容錯、連接服務(wù)器發(fā)送消息等功能。

2、Mybatis

還有一個典型的應(yīng)用,就是我們經(jīng)常在用的Mybatis。我們在使用的時候,一般只操作Mapper接口,然后Mybatis會找到對應(yīng)的SQL語句來執(zhí)行。

public interface UserMapper {   
    List<User> getUserList();
}

如上代碼,UserMapper也只是一個普通的接口,它是怎樣最終執(zhí)行到我們的SQL語句的呢?

答案也是代理。當(dāng)Mybatis掃描到我們定義的Mapper接口時,會將其設(shè)置為MapperFactoryBean,并創(chuàng)建返回一個代理對象。

protected T newInstance(SqlSession sqlSession) {
    final MapperProxy<T> mapperProxy = new MapperProxy<>(sqlSession, mapperInterface, methodCache);
    return (T) Proxy.newProxyInstance(mapperInterface.getClassLoader(), new Class[] { mapperInterface }, mapperProxy);
}

代理對象去通過請求的方法名找到MappedStatement對象,調(diào)用執(zhí)行器,解析SqlSource對象來生成SQL,執(zhí)行并解析返回結(jié)果等。

以上案例具體的實(shí)現(xiàn)過程,在這里就不再深入細(xì)聊。有興趣可能翻閱筆者其他文章~

九、裝飾器模式

裝飾器模式,在不改變現(xiàn)有對象結(jié)構(gòu)的情況下,動態(tài)地給該對象增加一些職責(zé)(即增加其額外功能)的模式,它屬于對象結(jié)構(gòu)型模式。

Mybatis里的緩存設(shè)計,就是裝飾器模式的典型應(yīng)用。

首先,我們知道,MyBatis 執(zhí)行器是 MyBatis 調(diào)度的核心,它負(fù)責(zé)SQL語句的生成和執(zhí)行。

在創(chuàng)建SqlSession的時候,會創(chuàng)建這個執(zhí)行器,默認(rèn)的執(zhí)行器是SimpleExecutor。

但是為了給執(zhí)行器增加緩存的職責(zé),就變成了在SimpleExecutor上一層添加了CachingExecutor。

在CachingExecutor中的實(shí)際操作還是委托給SimpleExecutor去執(zhí)行,只是在執(zhí)行前后增加了緩存的操作。

首先,我們來看看它的裝飾過程。

public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
    //默認(rèn)的執(zhí)行器
    executorType = executorType == null ? defaultExecutorType : executorType;
    executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
    Executor executor;
    if (ExecutorType.BATCH == executorType) {
        executor = new BatchExecutor(this, transaction);
    } else if (ExecutorType.REUSE == executorType) {
        executor = new ReuseExecutor(this, transaction);
    } else {
        executor = new SimpleExecutor(this, transaction);
    }
    //使用緩存執(zhí)行器來裝飾
    if (cacheEnabled) {
        executor = new CachingExecutor(executor);
    }
    executor = (Executor) interceptorChain.pluginAll(executor);
    return executor;
}

當(dāng)SqlSession執(zhí)行方法的時候,則會先調(diào)用到CachingExecutor,我們來看查詢方法。

public class CachingExecutor implements Executor {
    @Override
    public <E> List<E> query()throws SQLException {
        Cache cache = ms.getCache();
        if (cache != null) {
            flushCacheIfRequired(ms);
            if (ms.isUseCache() && resultHandler == null) {
                List<E> list = (List<E>) tcm.getObject(cache, key);
                if (list == null) {
                    list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
                    tcm.putObject(cache, key, list); // issue #578 and #116
                }
                return list;
            }
        }
        return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
    }
}

這里的代碼,如果開啟了緩存,則先從緩存中獲取結(jié)果。如果沒有開啟緩存或者緩存中沒有結(jié)果,則再調(diào)用SimpleExecutor執(zhí)行器去數(shù)據(jù)庫中查詢。

十、觀察者模式

觀察者模式,定義對象間的一種一對多的依賴關(guān)系,當(dāng)一個對象的狀態(tài)發(fā)生改變時,所有依賴于它的對象都得到通知并被自動更新。

在Spring或者SpringBoot項(xiàng)目中,有時候我們需要在Spring容器啟動并加載完之后,做一些系統(tǒng)初始化的事情。這時候,我們可以配置一個觀察者ApplicationListener,來達(dá)到這一目的。這就是觀察者模式的實(shí)踐。

@Component
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent> {
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        System.out.println("干一些系統(tǒng)初始化的事情....");
        ApplicationContext context = event.getApplicationContext();
        String[] names = context.getBeanDefinitionNames();
        for (String beanName:names){
            System.out.println("----------"+beanName+"---------");
        }
    }
}

首先,我們知道,ApplicationContext是 Spring 中的核心容器。

public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext {
    
    //觀察者容器
    private final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
    //被觀察者
    private ApplicationEventMulticaster applicationEventMulticaster;
}

在ApplicationContext容器刷新的時候,會初始化一個被觀察者,并注冊到Spring容器中。

然后,注冊各種觀察者到被觀察者中,形成一對多的依賴。

public abstract class AbstractApplicationContext{
    
    protected void registerListeners() {
        for (ApplicationListener<?> listener : getApplicationListeners()) {
            getApplicationEventMulticaster().addApplicationListener(listener);
        }
        String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
        for (String listenerBeanName : listenerBeanNames) {
            getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
        }
        Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
        this.earlyApplicationEvents = null;
        if (earlyEventsToProcess != null) {
            for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
                getApplicationEventMulticaster().multicastEvent(earlyEvent);
            }
        }
    }
}

這時候,我們自定義的觀察者對象也被注冊到了applicationEventMulticaster里面。

最后,當(dāng)ApplicationContext完成刷新后,則發(fā)布ContextRefreshedEvent事件。

protected void finishRefresh() {
    publishEvent(new ContextRefreshedEvent(this));
}

通知觀察者,調(diào)用ApplicationListener.onApplicationEvent()。

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
    listener.onApplicationEvent(event);
}

接下來我們再看看在Dubbo是如何應(yīng)用這一機(jī)制的。

Dubbo服務(wù)導(dǎo)出過程始于 Spring 容器發(fā)布刷新事件,Dubbo 在接收到事件后,會立即執(zhí)行服務(wù)導(dǎo)出邏輯。

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
        ApplicationEventPublisherAware {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }
}

我們看到,Dubbo中的ServiceBean也實(shí)現(xiàn)了 ApplicationListener 接口,在 Spring 容器發(fā)布刷新事件之后就會執(zhí)行導(dǎo)出方法。我們重點(diǎn)關(guān)注,在Dubbo執(zhí)行完導(dǎo)出之后,它也發(fā)布了一個事件。

public class ServiceBean<T>{
    
    public void export() {
        super.export();
        publishExportEvent();
    }
    private void publishExportEvent() {
        ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this);
        applicationEventPublisher.publishEvent(exportEvent);
    }
}

ServiceBeanExportedEvent,服務(wù)導(dǎo)出事件,需要繼承Spring中的事件對象ApplicationEvent。

public class ServiceBeanExportedEvent extends ApplicationEvent {
    public ServiceBeanExportedEvent(ServiceBean serviceBean) {
        super(serviceBean);
    }
    public ServiceBean getServiceBean() {
        return (ServiceBean) super.getSource();
    }
}

然后我們自定義一個ApplicationListener,也就是觀察者,就可以監(jiān)聽到Dubbo服務(wù)接口導(dǎo)出事件了。

@Component
public class ServiceBeanListener implements ApplicationListener<ServiceBeanExportedEvent> {
    @Override
    public void onApplicationEvent(ServiceBeanExportedEvent event) {
        ServiceBean serviceBean = event.getServiceBean();
        String beanName = serviceBean.getBeanName();
        Service service = serviceBean.getService();
        System.out.println(beanName+":"+service);
    }
}

十一、命令模式

命令模式是一種行為設(shè)計模式,它可將請求轉(zhuǎn)換為一個包含與請求相關(guān)的所有信息的獨(dú)立對象。該轉(zhuǎn)換讓你能根據(jù)不同的請求將方法參數(shù)化、延遲請求執(zhí)行或?qū)⑵浞湃腙犃兄校夷軐?shí)現(xiàn)可撤銷操作。

Hystrix是Netflix開源的一款容錯框架,具有自我保護(hù)能力。可以阻止故障的連鎖反應(yīng),快速失敗和優(yōu)雅降級。

它用一個HystrixCommand或者HystrixObservableCommand包裝所有對外部系統(tǒng)/依賴的調(diào)用,每個命令在單獨(dú)線程中/信號授權(quán)下執(zhí)行。這正是命令模式的典型應(yīng)用。

我們來看一個Hystrix應(yīng)用的例子。

首先,我們需要創(chuàng)建一個具體的命令類,通過構(gòu)造函數(shù)傳遞接收者對象。

public class OrderServiceHystrixCommand extends HystrixCommand<Object> {
    
    //接收者,處理業(yè)務(wù)邏輯
    private OrderService orderService;

    public OrderServiceHystrixCommand(OrderService orderService) {
        super(setter());
        this.orderService = orderService;
    }
    //設(shè)置Hystrix相關(guān)參數(shù)
    public static Setter setter() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("orderGroup");
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("orderService");
        HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(1)
                .withQueueSizeRejectionThreshold(1);
        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter();
        return Setter.withGroupKey(groupKey)
                .andCommandKey(commandKey)
                .andThreadPoolPropertiesDefaults(threadPoolProperties)
                .andCommandPropertiesDefaults(commandProperties);

    }
    @Override
    protected Object run() throws InterruptedException {
        Thread.sleep(500);
        return orderService.orders();
    }
    @Override
    protected Object getFallback() {
        System.out.println("-------------------------------");
        return new ArrayList();
    }
}

然后,在客戶端調(diào)用的時候,創(chuàng)建這個命令類并執(zhí)行即可。

@RestController
public class OrderController {

    @Autowired
    OrderService orderService;

    @RequestMapping("/orders")
    public Object orders(){
        OrderServiceHystrixCommand command = new OrderServiceHystrixCommand(orderService);
        return command.execute();
    }
}

看上去,命令模式和策略模式有些相像,它們都可以通過某些行為來參數(shù)化對象。但它們的思想有很大區(qū)別。

比如說我們可以使用命令來將任何操作轉(zhuǎn)換為對象,操作的參數(shù)將成為對象的成員變量。同樣的,我們也可以對請求做任何操作,比如延遲執(zhí)行,記錄日志,保存歷史命令等。

而策略模式側(cè)重點(diǎn)在于描述完成某件事的不同方式, 讓你能夠在同一個上下文類中切換算法。

總結(jié)

本文重點(diǎn)介紹了設(shè)計模式在不同框架中的實(shí)現(xiàn),以期讓大家更好地理解模式背后的思想和應(yīng)用場景。歡迎有不同想法的朋友,留言探討~

原創(chuàng)不易,客官們點(diǎn)個贊再走嘛,這將是筆者持續(xù)寫作的動力~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容