PulsarClient 解析(一)

PulsarClient

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

讓我們看一下這個(gè)類的主要方法


image-20210104211849635.png
  • 創(chuàng)建producer/consumer/reader

  • 元數(shù)據(jù)信息相關(guān)

  • transaction相關(guān)

  • close方法


ClientBuilder

這里有一個(gè)builder方法用來傳遞一些PulsarClient的配置

支持的配置項(xiàng)

  1. 連接配置相關(guān):

    • 連接地址:serviceUrl / serviceUrlProvider / listener / proxyServiceUrl

    • operation超時(shí)時(shí)間: operationTimeout

    • tcp配置:

      • tcpNoDelay

      • keepAliveinterval

      • 建立連接超時(shí):connectionTimeout

      • 一個(gè)broker創(chuàng)建多少連接

    • 請(qǐng)求重試策略(請(qǐng)求出錯(cuò)后backOff時(shí)間是多少)

  2. lookup請(qǐng)求配置:

    • lookup請(qǐng)求并發(fā)

    • 最大重定向次數(shù)

    • 連接最大拒絕的請(qǐng)求數(shù)目

  3. 線程數(shù)目:

    • ioThreads

    • listenerThreads

  4. TLS + 鑒權(quán)相關(guān)

  5. 事務(wù)相關(guān)

  6. metric相關(guān)

這里面Builder.build就直接配置參數(shù)傳入了PulsarClientImpl的構(gòu)造函數(shù)了

我們看下這里面做了什么操作

PulsarClientImpl

package org.apache.pulsar.client.impl;

public class PulsarClientImpl implements PulsarClient {

        // 查找服務(wù)
    private LookupService lookup;
    
    // 連接池
    private final ConnectionPool cnxPool;
    
    // netty 里面的HashedWheelTimer,用來調(diào)度一些延遲操作
    private final Timer timer;
    private final ExecutorProvider externalExecutorProvider;
    private final ExecutorProvider internalExecutorService;

        // 當(dāng)前PulsarClient的狀態(tài)
    private AtomicReference<State> state = new AtomicReference<>();
    
    // 所有的業(yè)務(wù)處理單元(客戶端邏輯)
    private final Set<ProducerBase<?>> producers;
    private final Set<ConsumerBase<?>> consumers;

        // id發(fā)號(hào)器
    private final AtomicLong producerIdGenerator = new AtomicLong();
    private final AtomicLong consumerIdGenerator = new AtomicLong();
    private final AtomicLong requestIdGenerator = new AtomicLong();

      // 這里面的EventLoopGroup好像只被當(dāng)成線程池來用了
    // 0. ConnectionPool 里面初始化作為連接的io線程池(netty客戶端常規(guī)用法)
    // 1. 在Consumer里面用來定時(shí)flush PersistentAcknowledgmentsGroupingTracker
    // 2. Producer 里面用來定時(shí)生成加密的key
    // 3. 作為AsyncHttpClient的構(gòu)造參數(shù)
    private final EventLoopGroup eventLoopGroup;

        // Schema 的cache
    private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache;

    // producer 用來生成PublishTime
    private final Clock clientClock;

    @Getter
    private TransactionCoordinatorClientImpl tcClient;

這個(gè)類的構(gòu)造函數(shù)主要就是初始化這幾個(gè)關(guān)鍵變量,沒有特殊操作

LookUpService根據(jù)配置參數(shù)會(huì)選擇HttpLookupService 或者是BinaryProtoLookupService


ConnectionPool

我們先看一下ConnectionPool

package org.apache.pulsar.client.impl;

public class ConnectionPool implements Closeable {
  
    // 連接池,保存連接
    // 地址 -> 第x個(gè)連接 -> 連接
    // 如果配置maxConnectionsPerHosts=0 則把pooling關(guān)閉了
    protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
  
    // netty 相關(guān)
    // PulsarClient 傳遞過來的
    private final EventLoopGroup eventLoopGroup;
    private final Bootstrap bootstrap;
    private final PulsarChannelInitializer channelInitializerHandler;
    protected final DnsNameResolver dnsResolver;
  
    // 配置
    private final ClientConfigurationData clientConfig;
    private final int maxConnectionsPerHosts;
  
    // 是否是Server Name Indication 代理,TLS 相關(guān),先忽略
    private final boolean isSniProxy;
 

構(gòu)造函數(shù)主要是按照netty 網(wǎng)絡(luò)客戶端方式初始化相關(guān)成員變量

        bootstrap = new Bootstrap();
        // 綁定io線程池
        bootstrap.group(eventLoopGroup);
        // 配置了channel類型,如果支持Epoll的話會(huì)變成Epoll的channel
        bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
        // 設(shè)置tcp的連接超時(shí)時(shí)間
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
        // 設(shè)置tcp no delay
        bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
        // 配置allocator
        bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        // 綁定channelInitializer
            channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
            bootstrap.handler(channelInitializerHandler);
        // 這個(gè)類是netty提供的,用來解析DNS,后面專門會(huì)說
        this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
    }

這里面?zhèn)魅氲腂ufferPool是一個(gè)自定義的

這個(gè)連接池的主要功能

  1. 創(chuàng)建并cache連接

  2. 歸還連接

  3. 按照配置的maxConnectionsPerHosts限制連接數(shù)目

具體使用方式可以參照org.apache.pulsar.client.impl.ConnectionPoolTest 這個(gè)類

ConnectionPool pool;
InetSocketAddress brokerAddress = ....;

// 獲取連接,如果之前沒有的話,會(huì)創(chuàng)建一個(gè)
CompletableFuture<ClientCnx> conn = pool.getConnection(brokerAddress);
ClientCnx cnx = conn.get();

// 使用連接做事情
...
  
// 歸還給連接池
pool.releaseConnection(cnx);
          
pool.closeAllConnections();
pool.close();

我們先看一下這個(gè)類PulsarChannelInitializer用來初始化和pulsar broker 端的連接。

public void initChannel(SocketChannel ch) throws Exception {

    // tls相關(guān)
    ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);

    // 定長(zhǎng)解碼器
    ch.pipeline().addLast("frameDecoder", 
                          new LengthFieldBasedFrameDecoder(
            Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
  
    // 到這里可以拿到了RPC協(xié)議反序列化后的對(duì)象,進(jìn)行客戶端邏輯處理
    // 實(shí)際在這個(gè)類ClientCnx里面處理所有邏輯
    ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
創(chuàng)建連接邏輯 (connectToAddress)

netty 的bootstrap.connect(忽略tls)


ClientCnx

我們看一下這個(gè)類的層次結(jié)構(gòu)

public class ClientCnx extends PulsarHandler;
public abstract class PulsarHandler extends PulsarDecoder;
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter;

PulsarDecoder

PulsarDecoder 這個(gè)類前面在初始化連接的時(shí)候還加入了一個(gè)LengthFieldBasedFrameDecoder.

所以到這里的channelRead就可以直接反序列化RPC就可以,之后會(huì)調(diào)用相應(yīng)的RPC處理方法(handleXXXXXX)

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ...
          
        // Get a buffer that contains the full frame
        ByteBuf buffer = (ByteBuf) msg;
        BaseCommand cmd = null;
        BaseCommand.Builder cmdBuilder = null;
        try {
            // De-serialize the command
            int cmdSize = (int) buffer.readUnsignedInt();
            int writerIndex = buffer.writerIndex();
            buffer.writerIndex(buffer.readerIndex() + cmdSize);
          
            // 從對(duì)象池里拿到一個(gè)ByteBufCodedInputStream
            ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(buffer);
            cmdBuilder = BaseCommand.newBuilder();
            // 反序列化
            cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();
            buffer.writerIndex(writerIndex);

            cmdInputStream.recycle();

            ...
            // 下面按照不同的RPC類型調(diào)用不用的方法進(jìn)行處理
            switch (cmd.getType()) {
            case PARTITIONED_METADATA:
                checkArgument(cmd.hasPartitionMetadata());
                try {
                    interceptCommand(cmd);
                    handlePartitionMetadataRequest(cmd.getPartitionMetadata());
                } catch (InterceptException e) {
                    ctx.writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(e.getErrorCode()),
                            e.getMessage(), cmd.getPartitionMetadata().getRequestId()));
                } finally {
                    cmd.getPartitionMetadata().recycle();
                }
                break;
            ...
              // 省略其他RPC方法,都是正常handleXXXXX
        } finally {
               // 清理方法
            if (cmdBuilder != null) {
                cmdBuilder.recycle();
            }

            if (cmd != null) {
                cmd.recycle();
            }

            buffer.release();
        }
    }

PulsarHandler

這個(gè)類實(shí)際里面主要增加了KeepAlive邏輯的實(shí)現(xiàn)。

具體查看相應(yīng)方法即可,比較容易


ClientCnx

這里主要負(fù)責(zé)和服務(wù)端交互的邏輯。

package org.apache.pulsar.client.impl;


public class ClientCnx extends PulsarHandler {


    // 連接狀態(tài)
    enum State {
        None, SentConnectFrame, Ready, Failed, Connecting
    }
    private State state;

   //----------------------------------------------------------------------
  
    // 臨時(shí)的請(qǐng)求隊(duì)列
    // requestId -> 請(qǐng)求
    private final ConcurrentLongHashMap<CompletableFuture<? extends Object>> pendingRequests =
        new ConcurrentLongHashMap<>(16, 1);
  
    // Lookup 請(qǐng)求隊(duì)列
    private final Queue<Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>> waitingLookupRequests;

   //----------------------------------------------------------------------
  
    // 一些業(yè)務(wù)邏輯單元
    private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
    private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
  
   //----------------------------------------------------------------------
  
    // 異步新建連接的handle
    private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
  
   //----------------------------------------------------------------------
   
    // PulsarClient 構(gòu)造時(shí)傳遞進(jìn)來的線程池
    private final EventLoopGroup eventLoopGroup;

   //----------------------------------------------------------------------
  
    // 限流(和lookup有關(guān))
    private final Semaphore pendingLookupRequestSemaphore;
    private final Semaphore maxLookupRequestSemaphore;
  
    // 連接拒絕相關(guān)的成員(和lookup有關(guān))
    private final int maxNumberOfRejectedRequestPerConnection;
    private final int rejectedRequestResetTimeSec = 60;
    // 被拒絕的請(qǐng)求數(shù)目(和lookup有關(guān))
    private static final AtomicIntegerFieldUpdater<ClientCnx> NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater
            .newUpdater(ClientCnx.class, "numberOfRejectRequests");
    @SuppressWarnings("unused")
    private volatile int numberOfRejectRequests = 0;

    //----------------------------------------------------------------------
    // 用來檢查請(qǐng)求是否超時(shí)的數(shù)據(jù)結(jié)構(gòu)
    private static class RequestTime {
        final long creationTimeMs;
        final long requestId;
        final RequestType requestType;

        RequestTime(long creationTime, long requestId, RequestType requestType) {
            this.creationTimeMs = creationTime;
            this.requestId = requestId;
            this.requestType = requestType;
        }
    }
  
    // 超時(shí)的請(qǐng)求隊(duì)列
    private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
    
   //----------------------------------------------------------------------
  
    // 消息的最大大小
    @Getter
    private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;

    // RPC協(xié)議版本
    private final int protocolVersion;
  
    // operation超時(shí)時(shí)間
    private final long operationTimeoutMs;
    // 用來檢查operation超時(shí)時(shí)間的handle
    private ScheduledFuture<?> timeoutTask;
  
   //----------------------------------------------------------------------
  
    // 一些記錄是否從proxy連接的信息
    protected String proxyToTargetBrokerAddress = null;
    protected String remoteHostName = null;
  
    // TLS 相關(guān)
    private boolean isTlsHostnameVerificationEnable;
    private static final TlsHostnameVerifier HOSTNAME_VERIFIER = new TlsHostnameVerifier();
    protected final Authentication authentication;
    protected AuthenticationDataProvider authenticationDataProvider;
  
   //----------------------------------------------------------------------
  
    // 事務(wù)相關(guān)
    private TransactionBufferHandler transactionBufferHandler;
    

    private enum RequestType {
        Command,
        GetLastMessageId,
        GetTopics,
        GetSchema,
        GetOrCreateSchema;

        String getDescription() {
            if (this == Command) {
                return "request";
            } else {
                return name() + " request";
            }
        }
    }

這里臨時(shí)回到ConnectionPool的邏輯中,之前創(chuàng)建連接的時(shí)候?qū)嶋H調(diào)用Bootstrap.connect這里返回的實(shí)際是一個(gè)Netty的Channel對(duì)象,但是ConnectionPool里面返回的ClientCnx對(duì)象。

ConnectionPool

private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
            InetSocketAddress physicalAddress, int connectionKey) {
     
        final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();

        // Trigger async connect to broker
        createConnection(physicalAddress).thenAccept(channel -> {
            ....
            // 這里面ClientCnx對(duì)象實(shí)際是從這個(gè)已經(jīng)成功連接的Channel的pipeline里拿到的
            final ClientCnx cnx = (ClientCnx) channel.pipeline().get("handler");
            ....

            if (!logicalAddress.equals(physicalAddress)) {
                // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
                // it can be specified when sending the CommandConnect.
                // That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after
                // this method.
                cnx.setTargetBroker(logicalAddress);
            }
            
            // 保存了遠(yuǎn)端連接的地址
            cnx.setRemoteHostName(physicalAddress.getHostName());

            cnx.connectionFuture().thenRun(() -> {
                ... 
                // 連接成功則返回
                cnxFuture.complete(cnx);
            }).exceptionally(exception -> {
               
                cnxFuture.completeExceptionally(exception);
                cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                cnx.ctx().close();
                return null;
            });
              
           ...

ClientCnx的主要方法(功能)
  • 連接生命周期管理(netty Handler里面的方法)

    • channelActive

    • channelInActive

    • exceptionCaught

    • ......

  • 發(fā)送request:主動(dòng)發(fā)送RPC的方法,并按照業(yè)務(wù)邏輯處理

    • Lookup請(qǐng)求

    • getLastMessageId

    • getSchema

    • .....

  • 處理response:繼承自PulsarDecoder 的handleXXXXX RPC 處理邏輯

  • 主動(dòng)發(fā)送RPC方法獲得原始的response

CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage,long requestId,RequestType requestType)
  • 檢查請(qǐng)求是否超時(shí)checkRequestTimeout

  • 注冊(cè)/ 刪除業(yè)務(wù)邏輯對(duì)象(業(yè)務(wù)邏輯對(duì)象后面單出文章說)

    • consumer

    • producer

    • transactionMetaStoreHandler

    • transactionBufferHandler


sendRequestAndHandleTimeout方法
private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
  
        // 放入到pending請(qǐng)求隊(duì)列里面,用來等待response
        CompletableFuture<T> future = new CompletableFuture<>();
        pendingRequests.put(requestId, future);
  
        // 直接發(fā)送RPC body
        ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
            if (!writeFuture.isSuccess()) {
                log.warn("{} Failed to send {} to broker: {}", ctx.channel(), requestType.getDescription(), writeFuture.cause().getMessage());
                pendingRequests.remove(requestId);
                future.completeExceptionally(writeFuture.cause());
            }
        });
        // 在超時(shí)隊(duì)列里面增加一個(gè)數(shù)據(jù)結(jié)構(gòu)用來記錄超時(shí)
        requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId, requestType));
  
        return future;
    }

chanelActive 方法

這個(gè)方法邏輯比較簡(jiǎn)單

  • PulsarHandler.channelActive方法里面開啟了KeepAlive邏輯的調(diào)度任務(wù)

  • ClientCnx.channelActive 方法里面開啟了requestTimeout邏輯的調(diào)度任務(wù)

  • 發(fā)送一個(gè)ConnectCommand請(qǐng)求給服務(wù)端(服務(wù)端處理邏輯到后面會(huì)說)


請(qǐng)求超時(shí)的處理

這個(gè)邏輯也比較容易。

使用了EventLoopGroup調(diào)度了一個(gè)定時(shí)任務(wù),每次去查看requestTimeoutQueue里面的請(qǐng)求是否有超時(shí)的

有的話就把這個(gè)請(qǐng)求的response設(shè)置成TimeoutException

這里的請(qǐng)求超時(shí)檢查時(shí)間間隔是operationTimeoutMs決定的


PulsarClient 功能回顧

這樣讓我們回顧一下PulsarClient的總體功能

  • 包含了一個(gè)連接池用來創(chuàng)建ClientCnx和服務(wù)端進(jìn)行溝通

  • 保存了一些自定義業(yè)務(wù)處理單元(consumer,producer, tcClient)

  • LookupService

  • 一些周期check的動(dòng)作

  • Schema 的LoadingCache

業(yè)務(wù)單元通過注冊(cè)到ClientCnx上面,可以使用這個(gè)連接發(fā)送RPC,獲得response,這樣傳遞回業(yè)務(wù)邏輯單元里面

PulsarClient這個(gè)類對(duì)使用者來說提供了一個(gè)RPC層面的抽象,其他類使用RPC完成自己的邏輯

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容