淺談Java SPI原理與其在JDBC、Flink中的應(yīng)用

API vs SPI

API(Application Programming Interface)的概念對(duì)我們來說已經(jīng)是見怪不怪了。在日常開發(fā)過程中,我們需要調(diào)用平臺(tái)/框架提供的API,而我們的下游應(yīng)用也需要調(diào)用上游提供的API。一句話:API站在應(yīng)用的角度定義了功能如何實(shí)現(xiàn)。

但是,如果我們作為服務(wù)提供方,想要豐富現(xiàn)有的系統(tǒng),加入一些原本不具備的相對(duì)完整的能力,若是直接hack代碼的話,不僅要新建或改動(dòng)很多API,還需要重新構(gòu)建相關(guān)的模塊,并且可能無法很好地保證新模塊與舊有模塊的統(tǒng)一性。而Java 6引入了SPI(Service Provider Interface,服務(wù)提供者接口),可以非常方便地幫助我們實(shí)現(xiàn)插件化開發(fā)。顧名思義,SPI仍然遵循基于接口編程的思想,服務(wù)提供方通過實(shí)現(xiàn)SPI定義的接口來擴(kuò)展系統(tǒng),SPI機(jī)制后續(xù)完成發(fā)現(xiàn)與注入的職責(zé)。也就是說,SPI是系統(tǒng)為第三方專門開放的擴(kuò)展規(guī)范以及動(dòng)態(tài)加載擴(kuò)展點(diǎn)的機(jī)制

API和SPI之間的不同可以藉由下圖來說明。

SPI實(shí)現(xiàn)原理

當(dāng)我們作為服務(wù)提供方利用SPI機(jī)制時(shí),需要遵循SPI的約定:

  • 先編寫好服務(wù)接口的實(shí)現(xiàn)類,即服務(wù)提供類;
  • 然后在classpath的META-INF/services目錄下創(chuàng)建一個(gè)以接口全限定名命名的UTF-8文本文件,并在該文件中寫入實(shí)現(xiàn)類的全限定名(如果有多個(gè)實(shí)現(xiàn)類,以換行符分隔);
  • 最后調(diào)用JDK中的java.util.ServiceLoader組件中的load()方法,就會(huì)根據(jù)上述文件來發(fā)現(xiàn)并加載具體的服務(wù)實(shí)現(xiàn)。

簡單看一下ServiceLoader的源碼。首先列舉幾個(gè)重要的屬性,注釋寫得很清楚,就不多廢話了。

private static final String PREFIX = "META-INF/services/";
// The class or interface representing the service being loaded
private final Class<S> service;
// The class loader used to locate, load, and instantiate providers
private final ClassLoader loader;
// The access control context taken when the ServiceLoader is created
private final AccessControlContext acc;
// Cached providers, in instantiation order
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();
// The current lazy-lookup iterator
private LazyIterator lookupIterator;

從load()方法開始向下追溯:

public static <S> ServiceLoader<S> load(Class<S> service) {
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    return ServiceLoader.load(service, cl);
}

public static <S> ServiceLoader<S> load(Class<S> service,
                                        ClassLoader loader)
{
    return new ServiceLoader<>(service, loader);
}

private ServiceLoader(Class<S> svc, ClassLoader cl) {
    service = Objects.requireNonNull(svc, "Service interface cannot be null");
    loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
    acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
    reload();
}

public void reload() {
    providers.clear();
    lookupIterator = new LazyIterator(service, loader);
}

LazyIterator是一個(gè)懶加載服務(wù)提供類的迭代器(ServiceLoader本身也是實(shí)現(xiàn)了Iterable接口的),維護(hù)在lookupIterator中。在實(shí)際應(yīng)用中,我們需要調(diào)用ServiceLoader#iterator()方法獲取加載到的服務(wù)提供類的結(jié)果,該方法的代碼如下。

public Iterator<S> iterator() {
    return new Iterator<S>() {
        Iterator<Map.Entry<String,S>> knownProviders
            = providers.entrySet().iterator();

        public boolean hasNext() {
            if (knownProviders.hasNext())
                return true;
            return lookupIterator.hasNext();
        }

        public S next() {
            if (knownProviders.hasNext())
                return knownProviders.next().getValue();
            return lookupIterator.next();
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
    };
}

該方法返回一個(gè)標(biāo)準(zhǔn)的迭代器,先從緩存的providers容器中獲取,若獲取不到,再通過lookupIterator進(jìn)行懶加載。內(nèi)部類LazyIterator的部分相關(guān)代碼如下。

private class LazyIterator implements Iterator<S> {
    Class<S> service;
    ClassLoader loader;
    Enumeration<URL> configs = null;
    Iterator<String> pending = null;
    String nextName = null;

    private LazyIterator(Class<S> service, ClassLoader loader) {
        this.service = service;
        this.loader = loader;
    }

    // Iterator.hasNext()方法直接調(diào)用了此方法
    private boolean hasNextService() {
        if (nextName != null) {
            return true;
        }
        if (configs == null) {
            try {
                String fullName = PREFIX + service.getName();
                if (loader == null)
                    configs = ClassLoader.getSystemResources(fullName);
                else
                    configs = loader.getResources(fullName);
            } catch (IOException x) {
                fail(service, "Error locating configuration files", x);
            }
        }
        while ((pending == null) || !pending.hasNext()) {
            if (!configs.hasMoreElements()) {
                return false;
            }
            pending = parse(service, configs.nextElement());
        }
        nextName = pending.next();
        return true;
    }

     // Iterator.next()方法直接調(diào)用了此方法
    private S nextService() {
        if (!hasNextService())
            throw new NoSuchElementException();
        String cn = nextName;
        nextName = null;
        Class<?> c = null;
        try {
            c = Class.forName(cn, false, loader);
        } catch (ClassNotFoundException x) {
            fail(service,
                 "Provider " + cn + " not found");
        }
        if (!service.isAssignableFrom(c)) {
            fail(service,
                 "Provider " + cn  + " not a subtype");
        }
        try {
            S p = service.cast(c.newInstance());
            providers.put(cn, p);
            return p;
        } catch (Throwable x) {
            fail(service,
                 "Provider " + cn + " could not be instantiated",
                 x);
        }
        throw new Error();          // This cannot happen
    }

    // ......
}

注意觀察hasNextService()和nextService()兩個(gè)方法:前者在前文所述SPI定義文件中逐個(gè)尋找對(duì)應(yīng)的服務(wù)提供類并加載資源,后者則通過反射創(chuàng)建服務(wù)提供類的實(shí)例,并緩存下來,直到完成整個(gè)發(fā)現(xiàn)與注入的流程,所以是懶加載的。由此也可得知,SPI機(jī)制內(nèi)部一定會(huì)遍歷所有的擴(kuò)展點(diǎn)并將它們?nèi)考虞d(主要缺點(diǎn))。

下面以JDBC和Flink為例簡單說說SPI的實(shí)際應(yīng)用。

JDBC中的SPI

JDBC是為用戶通過Java訪問數(shù)據(jù)庫提供的統(tǒng)一接口,而數(shù)據(jù)庫千變?nèi)f化,因此借助SPI機(jī)制可以靈活地實(shí)現(xiàn)數(shù)據(jù)庫驅(qū)動(dòng)的插件化。

在使用舊版JDBC時(shí),我們必須首先調(diào)用類似Class.forName("com.mysql.jdbc.Driver")的方法,通過反射來手動(dòng)加載數(shù)據(jù)庫驅(qū)動(dòng)。但是在新版JDBC中已經(jīng)不用寫了,只需直接調(diào)用DriverManager.getConnection()方法即可獲得數(shù)據(jù)庫連接。看一下java.sql.DriverManager的靜態(tài)代碼塊中調(diào)用的loadInitialDrivers()方法的部分代碼:

private static void loadInitialDrivers() {
    // ......
    
    AccessController.doPrivileged(new PrivilegedAction<Void>() {
        public Void run() {
            ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);
            Iterator<Driver> driversIterator = loadedDrivers.iterator();
            try{
                while(driversIterator.hasNext()) {
                    driversIterator.next();
                }
            } catch(Throwable t) { }
            return null;
        }
    });

    // ......
}

可見是利用SPI機(jī)制來獲取并加載驅(qū)動(dòng)提供類(java.sql.Driver接口的實(shí)現(xiàn)類)。以MySQL JDBC驅(qū)動(dòng)為例,在其META-INF/services目錄下找到名為java.sql.Driver的文件:

其內(nèi)容是:

com.mysql.jdbc.Driver
com.mysql.fabric.jdbc.FabricMySQLDriver

驅(qū)動(dòng)類都會(huì)調(diào)用DriverManager#registerDriver()方法注冊自身。如果加載了多個(gè)JDBC驅(qū)動(dòng)類(比如MySQL、PostgreSQL等等),獲取數(shù)據(jù)庫連接時(shí)會(huì)遍歷所有已經(jīng)注冊的驅(qū)動(dòng)實(shí)例,逐個(gè)調(diào)用其connect()方法嘗試是否能成功建立連接,并返回第一個(gè)成功的連接。具體可參看DriverManager#getConnection()方法。

Flink中的SPI

SPI機(jī)制在Flink的Table模塊中也有廣泛應(yīng)用——因?yàn)镕link Table的類型有很多種,同樣非常適合插件化。org.apache.flink.table.factories.TableFactory是Flink為我們提供的SPI工廠接口,在其注釋中也說明了這一點(diǎn)。

/**
 * A factory to create different table-related instances from string-based properties. This
 * factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is
 * called with a set of normalized properties that describe the desired configuration. The factory
 * allows for matching to the given set of properties.
 *
 * <p>Classes that implement this interface can be added to the
 * "META_INF/services/org.apache.flink.table.factories.TableFactory" file of a JAR file in
 * the current classpath to be found.
 *
 * @see TableFormatFactory
 */
@PublicEvolving
public interface TableFactory {
    Map<String, String> requiredContext();
    List<String> supportedProperties();
}

以Flink-Hive Connector為例:

該文件的內(nèi)容為:

org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
org.apache.flink.table.module.hive.HiveModuleFactory

那么Flink是如何保證正確的TableFactory實(shí)現(xiàn)類被加載的呢?一路追蹤方法調(diào)用鏈,來到TableFactoryService#findSingleInternal()方法。

private static <T extends TableFactory> T findSingleInternal(
        Class<T> factoryClass,
        Map<String, String> properties,
        Optional<ClassLoader> classLoader) {
    List<TableFactory> tableFactories = discoverFactories(classLoader);
    List<T> filtered = filter(tableFactories, factoryClass, properties);

    if (filtered.size() > 1) {
        throw new AmbiguousTableFactoryException(
            filtered,
            factoryClass,
            tableFactories,
            properties);
    } else {
        return filtered.get(0);
    }
}

其中,discoverFactories()方法用來發(fā)現(xiàn)并加載Table的服務(wù)提供類,filter()方法則用來過濾出滿足當(dāng)前應(yīng)用需要的服務(wù)提供類。前者最終調(diào)用了ServiceLoader的相關(guān)方法,如下:

private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) {
    try {
        List<TableFactory> result = new LinkedList<>();
        ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader());
        ServiceLoader
            .load(TableFactory.class, cl)
            .iterator()
            .forEachRemaining(result::add);
        return result;
    } catch (ServiceConfigurationError e) {
        LOG.error("Could not load service provider for table factories.", e);
        throw new TableException("Could not load service provider for table factories.", e);
    }
}

過濾邏輯相對(duì)復(fù)雜,時(shí)間已經(jīng)晚了,就不再廢話了(

The End

民那晚安晚安。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容