單元測試之embedded-kafka

在項目中,團隊也使用了Kafka作為消息中間件。 經(jīng)過了嵌入式redis選型的問題之后,筆者在嵌入式kafka選型時就更傾向于還在持續(xù)更新,并且維護人員是一個團隊而不是個人或者松散的組織。 最終,筆者選擇了來自salesforce的開源項目

<groupId>com.salesforce.kafka.test</groupId>
    <artifactId>kafka-junit</artifactId>
    <version>3.0.1</version>

以下是在項目自帶的測試用例代碼上稍加修改的案例。

package com.salesforce.kafka.test.junit4;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.util.Collection;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.common.Node;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.salesforce.kafka.test.KafkaBroker;
import com.salesforce.kafka.test.KafkaTestServer;

public class KafkaBrokerTest {
    private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerTest.class);

    /**
     * Validate that we started 2 brokers.
     * @throws Exception 
     */
    @Test
    public void testTwoBrokersStarted() throws Exception {
        Properties overrideProperties = new Properties();
        overrideProperties.put("broker.id", "1");
        overrideProperties.put("port", "6666");
        KafkaTestServer server1= new KafkaTestServer(overrideProperties);
        server1.start();
       String string= server1.getKafkaBrokers().getBrokerById(1).getConnectString();
        logger.info("\n"+string+"\n");
         
        overrideProperties.put("broker.id", "2");
        overrideProperties.put("port", "8888");
        KafkaTestServer server2= new KafkaTestServer(overrideProperties);
        server2.start();
       String string2= server2.getKafkaBrokers().getBrokerById(2).getConnectString();
        logger.info("\n"+string2+"\n");

    }

}

問題

從上述案例中可以看出,在實際的kafka使用中,IP+端口號是每個kafka broker都不一樣的。但是在salesforce/kafka-core中提供的KafkaTestCluster類中,其并沒有給外部來指定某個broker port的機會。

 /**
     * Starts the cluster.
     * @throws Exception on startup errors.
     * @throws TimeoutException When the cluster fails to start up within a timely manner.
     */
    public void start() throws Exception, TimeoutException {
        // Ensure zookeeper instance has been started.
        zkTestServer.start();

        // If we have no brokers defined yet...
        if (brokers.isEmpty()) {
            // Loop over brokers, starting with brokerId 1.
            for (int brokerId = 1; brokerId <= numberOfBrokers; brokerId++) {
                // Create properties for brokers
                final Properties brokerProperties = new Properties();

                // Add user defined properties.
                brokerProperties.putAll(overrideBrokerProperties);

                // Set broker.id
                brokerProperties.put("broker.id", String.valueOf(brokerId));

                // Create new KafkaTestServer and add to our broker list
                brokers.add(
                    new KafkaTestServer(brokerProperties, zkTestServer)
                );
            }
        }

        // Loop over each broker and start it
        for (final KafkaTestServer broker : brokers) {
            broker.start();
        }

        // Block until the cluster is 'up' or the timeout is exceeded.
        waitUntilClusterReady(10_000L);
    }

這在某些需要預(yù)先指定IP+端口號的場景中還是有一些麻煩的。需要后續(xù)進行處理。例如,給這個類新增一個構(gòu)造方法,利用以下的List ,把已經(jīng)完成初始化的List<KafkaTestServer> brokers 作為入?yún)鬟f進去。

private final List<KafkaTestServer> brokers = new ArrayList<>();

新增構(gòu)造方法:

public KafkaTestCluster(final List<KafkaTestServer> brokers) 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容