suricata 支持Kafka輸出

前提:

源代碼修改可以參考:
https://github.com/CosmosSun/suricata/tree/kafka-feature-3105-v5
C語言寫入kafka可參考:
https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.c
編譯時安裝依賴并開啟Kafka支持即可:

yum install librdkafka-devel

./configure --enable-rdkafka

修改:

加入其他配置及Kafka監(jiān)控配置

1、在上述已經(jīng)修改的基礎(chǔ)上修改:util-log-kafka.h 文件如下:

#ifndef __UTIL_LOG_KAFKA_H__
#define __UTIL_LOG_KAFKA_H__

#ifdef HAVE_LIBRDKAFKA
#include <librdkafka/rdkafka.h>


typedef struct KafkaSetup{
    const char *brokers;
    const char *topic_name;
    const char *broker_version;
    const char *queue_buff_max_kbytes;
    const char *queue_buff_max_ms;
    const char *queue_buff_max_messages;
    const char *stat_int_ms;
    const char *compression_codec;
    const char *batch_num_messages;
    const char *message_max_bytes;
    int partitions;
}KafkaSetup;

typedef struct SCLogKafkaContext{
    rd_kafka_t *rk;
    rd_kafka_topic_t *rkt;
    long partition;
}SCLogKafkaContext;

int LogFileWriteKafka(void *lf_ctx, const char *string, size_t string_len);
int SCConfLogOpenKafka(ConfNode *kafka_node, void *lf_ctx);

#endif

#endif

2、util-log-kafka.c 文件如下:


#include "suricata-common.h"
#include "util-log-kafka.h"
#include "util-logopenfile.h"

#ifdef HAVE_LIBRDKAFKA

/** \brief close kafka log
 *  \param log_ctx Log file context
 */

static int  partition = RD_KAFKA_PARTITION_UA;

static void SCLogFileCloseKafka(LogFileCtx *log_ctx)
{
    SCLogKafkaContext *kafka_ctx = log_ctx->kafka;

    if (NULL == kafka_ctx) {
        return;
    }

    if (kafka_ctx->rk) {
        /* Poll to handle delivery reports */
//      rd_kafka_poll(kafka_ctx->rk, 0);
        rd_kafka_flush(kafka_ctx->rk, 10*1000);
        /* Wait for messages to be delivered */
        while (rd_kafka_outq_len(kafka_ctx->rk) > 0)
            rd_kafka_poll(kafka_ctx->rk, 100);
    }

    if (kafka_ctx->rkt) {
        /* Destroy topic */
        rd_kafka_topic_destroy(kafka_ctx->rkt);
    }

    if (kafka_ctx->rk) {
        /* Destroy the handle */
        rd_kafka_destroy(kafka_ctx->rk);
    }
    return;
}

/**
 * \brief LogFileWriteKafka() writes log data to kafka output.
 * \param lf_ctx Log file context allocated by caller
 * \param string buffer with data to write
 * \param string_len data length
 * \retval 0 on sucess;
 * \retval -1 on failure;
 */
int LogFileWriteKafka(void *lf_ctx, const char *string, size_t string_len)
{
    LogFileCtx *log_ctx = lf_ctx;
    SCLogKafkaContext *kafka_ctx = log_ctx->kafka;
    retry:
        if (rd_kafka_produce(kafka_ctx->rkt, partition,
                RD_KAFKA_MSG_F_COPY,
            /* Payload and length */
            (void *)string, string_len,
            /* Optional key and its length */
            NULL, 0,
            /* Message opaque, provided in
             * delivery report callback as
             * msg_opaque. */
            NULL) == -1)
        {
            if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){
                /*如果內(nèi)部隊列滿,等待消息傳輸完成并retry,
                內(nèi)部隊列表示要發(fā)送的消息和已發(fā)送或失敗的消息,
                內(nèi)部隊列受限于queue.buffering.max.messages配置項*/
                 rd_kafka_poll(kafka_ctx->rk, 100);
                 goto retry;
            }else{
                SCLogError(SC_ERR_KAFKA,
                    "%% Failed to produce to topic %s "
                    "partition %i: \n",
                    log_ctx->kafka_setup.topic_name, partition);
            }
        }
        rd_kafka_poll(kafka_ctx->rk, 0);
    return -1;
}

//收到消息的回調(diào)
static void dr_msg_cb(rd_kafka_t *rk,
                      const rd_kafka_message_t *rkmessage, void *opaque){
        if(rkmessage->err)
            SCLogError(SC_ERR_KAFKA,"%% Message delivery failed: %s\n",
                    rd_kafka_err2str(rkmessage->err));
}
//Kafka統(tǒng)計回調(diào)
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque){
    SCLogInfo("%% stats delivery : %s\n",json);
    fprintf(stderr, "%% stats delivery : %s\n",json);
    return 0;
}

/** \brief configure and initializes kafka output logging
 *  \param kafka_node ConfNode structure for the output section in question
 *  \param lf_ctx Log file context allocated by caller
 *  \retval 0 on success
 */
int SCConfLogOpenKafka(ConfNode *kafka_node, void *lf_ctx)
{
    LogFileCtx *log_ctx = lf_ctx;
    const char *partitions = NULL;
    SCLogKafkaContext *kafka_ctx = NULL;

    if (NULL == kafka_node) {
        return -1;
    }

    log_ctx->kafka_setup.brokers = ConfNodeLookupChildValue(kafka_node, "brokers");
    log_ctx->kafka_setup.topic_name = ConfNodeLookupChildValue(kafka_node, "topic");
    log_ctx->kafka_setup.broker_version = ConfNodeLookupChildValue(kafka_node, "broker.version.fallback");
    log_ctx->kafka_setup.queue_buff_max_kbytes = ConfNodeLookupChildValue(kafka_node, "queue.buffering.max.kbytes");
    log_ctx->kafka_setup.queue_buff_max_ms = ConfNodeLookupChildValue(kafka_node, "queue.buffering.max.ms");
    log_ctx->kafka_setup.queue_buff_max_messages = ConfNodeLookupChildValue(kafka_node, "queue.buffering.max.messages");
    log_ctx->kafka_setup.stat_int_ms = ConfNodeLookupChildValue(kafka_node, "statistics.interval.ms");
    log_ctx->kafka_setup.batch_num_messages = ConfNodeLookupChildValue(kafka_node, "batch.num.messages");
    log_ctx->kafka_setup.message_max_bytes = ConfNodeLookupChildValue(kafka_node, "message.max.bytes");
    log_ctx->kafka_setup.compression_codec = ConfNodeLookupChildValue(kafka_node, "compression.codec");
    partitions =  ConfNodeLookupChildValue(kafka_node, "partitions");
    log_ctx->kafka_setup.partitions = atoi(partitions);

    /*create kafka ctx*/
    rd_kafka_conf_t *conf;
    rd_kafka_topic_conf_t *topic_conf;
    char tmp[16];
    char errstr[512];
    kafka_ctx = (SCLogKafkaContext*) SCCalloc(1, sizeof(SCLogKafkaContext));
    if (kafka_ctx == NULL) {
        SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate kafka context");
        exit(EXIT_FAILURE);
    }

    conf = rd_kafka_conf_new();
    snprintf(tmp, sizeof(tmp), "%i", SIGIO);
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "internal.termination.signal",
        tmp,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "Unable to allocate kafka context");
        exit(EXIT_FAILURE);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "broker.version.fallback",
        log_ctx->kafka_setup.broker_version,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "queue.buffering.max.messages",
        log_ctx->kafka_setup.queue_buff_max_messages,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
     if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "queue.buffering.max.ms",
        log_ctx->kafka_setup.queue_buff_max_ms,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "compression.codec",
        log_ctx->kafka_setup.compression_codec,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "queue.buffering.max.kbytes",
        log_ctx->kafka_setup.queue_buff_max_kbytes,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "batch.num.messages",
        log_ctx->kafka_setup.batch_num_messages,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
     if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "message.max.bytes",
        log_ctx->kafka_setup.message_max_bytes,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
     if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "statistics.interval.ms",
        log_ctx->kafka_setup.stat_int_ms,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
        exit(EXIT_FAILURE);
    }
    rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
    rd_kafka_conf_set_stats_cb(conf,stats_cb);
    if (!(kafka_ctx->rk = rd_kafka_new(RD_KAFKA_PRODUCER,
        conf,
        errstr,
        sizeof(errstr)))) {
        rd_kafka_conf_destroy(conf);
        SCLogError(SC_ERR_KAFKA, "%% Failed to create new producer: %s", errstr);
        exit(EXIT_FAILURE);
    }
    if (0 == rd_kafka_brokers_add(kafka_ctx->rk,
        log_ctx->kafka_setup.brokers)) {
        rd_kafka_destroy(kafka_ctx->rk);
        SCLogError(SC_ERR_KAFKA, "%% No valid brokers specified");
        exit(EXIT_FAILURE);
    }
    topic_conf = rd_kafka_topic_conf_new();
    kafka_ctx->rkt = rd_kafka_topic_new(kafka_ctx->rk,
        log_ctx->kafka_setup.topic_name,
        topic_conf);
    if (NULL == kafka_ctx->rkt) {
        rd_kafka_destroy(kafka_ctx->rk);
        SCLogError(SC_ERR_KAFKA, "%% Failed to create kafka topic %s",
            log_ctx->kafka_setup.topic_name);
        exit(EXIT_FAILURE);
    }

    kafka_ctx->partition = 0;
    log_ctx->kafka = kafka_ctx;
    log_ctx->Close = SCLogFileCloseKafka;

    return 0;
}
#endif

上述代碼不知道修改了哪些地方可以和原版的(https://github.com/CosmosSun/suricata/tree/kafka-feature-3105-v5)對比下。主要是添加了部分Kafka配置以及監(jiān)控統(tǒng)計(方便排查問題)。將statistics.interval.ms 設(shè)置為零 將關(guān)閉統(tǒng)計輸出。
kafka配置可以參考:https://blog.csdn.net/qq_34284638/article/details/97641038
kafka監(jiān)控輸出的數(shù)據(jù)為json,參數(shù)意思可以參考:
https://github.com/edenhill/librdkafka/blob/bb96705083b16d773cd15ef64880b605d82c5a1a/STATISTICS.md

遇到的問題

服務(wù)端版本為0.11.0 ,運行一會就輸出下面錯誤。在0.10.0下沒有這個錯誤。

 Receive failed: Connection reset by peer
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容