Flink 1.20 自定義 SQL 連接器詳細(xì)教程 - HTTP 連接器完整實現(xiàn)

Flink 1.20 自定義 SQL 連接器詳細(xì)教程 - HTTP 連接器完整實現(xiàn)

?? 完整講解 Apache Flink 1.20 自定義 Table/SQL 連接器開發(fā),包含完整 HTTP 連接器源碼和示例

目錄


1. 概述

1.1 什么是自定義連接器

Flink 連接器是 Flink 與外部系統(tǒng)的橋梁:

  • Source(源):從外部系統(tǒng)讀取數(shù)據(jù)
  • Sink(匯):向外部系統(tǒng)寫入數(shù)據(jù)

1.2 HTTP 連接器應(yīng)用場景

  • ? 調(diào)用 RESTful API 獲取實時數(shù)據(jù)
  • ? 輪詢 HTTP 接口獲取更新
  • ? 將處理結(jié)果 POST 到 Webhook
  • ? 對接第三方數(shù)據(jù)服務(wù)

2. 核心架構(gòu)

2.1 組件關(guān)系圖

SQL/Table API
     ↓
Factory (工廠類)
     ↓
DynamicTableSource/Sink
     ↓
SourceFunction/SinkFunction
     ↓
外部 HTTP API

2.2 核心接口

接口 作用
DynamicTableSourceFactory 創(chuàng)建 Source
DynamicTableSinkFactory 創(chuàng)建 Sink
ScanTableSource 掃描表源
RichSourceFunction<RowData> 讀取數(shù)據(jù)邏輯
RichSinkFunction<RowData> 寫入數(shù)據(jù)邏輯

3. Maven 項目配置

3.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.example.flink</groupId>
    <artifactId>flink-connector-http</artifactId>
    <version>1.0.0</version>
    
    <properties>
        <flink.version>1.20.0</flink.version>
        <java.version>11</java.version>
    </properties>
    
    <dependencies>
        <!-- Flink Table API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        
        <!-- HTTP 客戶端 -->
        <dependency>
            <groupId>org.apache.httpcomponents.client5</groupId>
            <artifactId>httpclient5</artifactId>
            <version>5.2.1</version>
        </dependency>
        
        <!-- JSON 處理 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.15.2</version>
        </dependency>
    </dependencies>
</project>

4. Source 連接器實現(xiàn)

4.1 配置選項類

package com.example.flink.http;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import java.time.Duration;

public class HttpOptions {
    public static final String IDENTIFIER = "http";
    
    public static final ConfigOption<String> URL =
        ConfigOptions.key("url")
            .stringType()
            .noDefaultValue()
            .withDescription("HTTP endpoint URL");
    
    public static final ConfigOption<String> METHOD =
        ConfigOptions.key("method")
            .stringType()
            .defaultValue("GET")
            .withDescription("HTTP method");
    
    public static final ConfigOption<String> AUTH_TOKEN =
        ConfigOptions.key("auth.token")
            .stringType()
            .noDefaultValue()
            .withDescription("Bearer token");
    
    public static final ConfigOption<Duration> POLL_INTERVAL =
        ConfigOptions.key("poll.interval")
            .durationType()
            .defaultValue(Duration.ofSeconds(5))
            .withDescription("Polling interval");
    
    public static final ConfigOption<String> JSON_PATH =
        ConfigOptions.key("json.path")
            .stringType()
            .noDefaultValue()
            .withDescription("JSON path to extract data");
    
    public static final ConfigOption<Integer> MAX_RETRIES =
        ConfigOptions.key("max.retries")
            .intType()
            .defaultValue(3)
            .withDescription("Max retry attempts");
}

4.2 Source Factory

package com.example.flink.http;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;

public class HttpDynamicTableSourceFactory implements DynamicTableSourceFactory {
    
    @Override
    public String factoryIdentifier() {
        return HttpOptions.IDENTIFIER;
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HttpOptions.URL);
        return options;
    }
    
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HttpOptions.METHOD);
        options.add(HttpOptions.AUTH_TOKEN);
        options.add(HttpOptions.POLL_INTERVAL);
        options.add(HttpOptions.JSON_PATH);
        options.add(HttpOptions.MAX_RETRIES);
        return options;
    }
    
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        FactoryUtil.TableFactoryHelper helper = 
            FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        
        ReadableConfig config = helper.getOptions();
        
        return new HttpDynamicTableSource(
            context.getCatalogTable().getResolvedSchema(),
            config
        );
    }
}

4.3 Dynamic Table Source

package com.example.flink.http;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;

public class HttpDynamicTableSource implements ScanTableSource {
    
    private final ResolvedSchema schema;
    private final ReadableConfig config;
    
    public HttpDynamicTableSource(ResolvedSchema schema, ReadableConfig config) {
        this.schema = schema;
        this.config = config;
    }
    
    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }
    
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        SourceFunction<RowData> sourceFunction = 
            new HttpSourceFunction(schema, config);
        return SourceFunctionProvider.of(sourceFunction, false);
    }
    
    @Override
    public DynamicTableSource copy() {
        return new HttpDynamicTableSource(schema, config);
    }
    
    @Override
    public String asSummaryString() {
        return "HTTP Table Source";
    }
}

4.4 Source Function(核心邏輯)

package com.example.flink.http;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

public class HttpSourceFunction extends RichSourceFunction<RowData> {
    
    private static final Logger LOG = LoggerFactory.getLogger(HttpSourceFunction.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();
    
    private final ResolvedSchema schema;
    private final ReadableConfig config;
    private volatile boolean running = true;
    private CloseableHttpClient httpClient;
    
    public HttpSourceFunction(ResolvedSchema schema, ReadableConfig config) {
        this.schema = schema;
        this.config = config;
    }
    
    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) {
        httpClient = HttpClients.createDefault();
        LOG.info("HTTP Source opened");
    }
    
    @Override
    public void run(SourceContext<RowData> ctx) throws Exception {
        String url = config.get(HttpOptions.URL);
        Duration pollInterval = config.get(HttpOptions.POLL_INTERVAL);
        String jsonPath = config.getOptional(HttpOptions.JSON_PATH).orElse(null);
        
        LOG.info("Start polling from: {}", url);
        
        while (running) {
            try {
                // 執(zhí)行 HTTP GET 請求
                String response = fetchData(url);
                
                if (response != null) {
                    // 解析 JSON
                    JsonNode root = MAPPER.readTree(response);
                    JsonNode dataNode = extractDataNode(root, jsonPath);
                    
                    // 轉(zhuǎn)換為 RowData 并發(fā)送
                    if (dataNode != null) {
                        if (dataNode.isArray()) {
                            for (JsonNode item : dataNode) {
                                RowData row = jsonToRowData(item);
                                if (row != null) {
                                    ctx.collect(row);
                                }
                            }
                        } else {
                            RowData row = jsonToRowData(dataNode);
                            if (row != null) {
                                ctx.collect(row);
                            }
                        }
                    }
                }
                
                Thread.sleep(pollInterval.toMillis());
                
            } catch (InterruptedException e) {
                LOG.info("HTTP source interrupted");
                break;
            } catch (Exception e) {
                LOG.error("Error fetching HTTP data", e);
                Thread.sleep(pollInterval.toMillis());
            }
        }
    }
    
    @Override
    public void cancel() {
        running = false;
    }
    
    @Override
    public void close() throws Exception {
        if (httpClient != null) {
            httpClient.close();
        }
    }
    
    /**
     * 從 HTTP 端點獲取數(shù)據(jù)
     */
    private String fetchData(String url) {
        try {
            HttpGet request = new HttpGet(url);
            
            // 設(shè)置認(rèn)證 Token
            config.getOptional(HttpOptions.AUTH_TOKEN).ifPresent(token ->
                request.setHeader("Authorization", "Bearer " + token)
            );
            
            try (CloseableHttpResponse response = httpClient.execute(request)) {
                if (response.getCode() >= 200 && response.getCode() < 300) {
                    return EntityUtils.toString(response.getEntity());
                } else {
                    LOG.warn("HTTP request failed: {}", response.getCode());
                }
            }
        } catch (Exception e) {
            LOG.error("HTTP request error", e);
        }
        return null;
    }
    
    /**
     * 從 JSON 響應(yīng)中提取數(shù)據(jù)節(jié)點
     */
    private JsonNode extractDataNode(JsonNode root, String jsonPath) {
        if (jsonPath == null || jsonPath.isEmpty()) {
            return root;
        }
        
        JsonNode current = root;
        for (String part : jsonPath.split("\\.")) {
            current = current.get(part);
            if (current == null) {
                LOG.warn("JSON path not found: {}", jsonPath);
                return null;
            }
        }
        return current;
    }
    
    /**
     * 將 JSON 轉(zhuǎn)換為 RowData
     */
    private RowData jsonToRowData(JsonNode json) {
        try {
            int fieldCount = schema.getColumnCount();
            GenericRowData row = new GenericRowData(fieldCount);
            
            for (int i = 0; i < fieldCount; i++) {
                String fieldName = schema.getColumnNames().get(i);
                LogicalType type = schema.getColumnDataTypes().get(i).getLogicalType();
                JsonNode fieldNode = json.get(fieldName);
                
                if (fieldNode == null || fieldNode.isNull()) {
                    row.setField(i, null);
                } else {
                    row.setField(i, convertValue(fieldNode, type));
                }
            }
            
            return row;
        } catch (Exception e) {
            LOG.error("Error converting JSON to RowData", e);
            return null;
        }
    }
    
    /**
     * 類型轉(zhuǎn)換
     */
    private Object convertValue(JsonNode node, LogicalType type) {
        LogicalTypeRoot typeRoot = type.getTypeRoot();
        
        switch (typeRoot) {
            case VARCHAR:
            case CHAR:
                return StringData.fromString(node.asText());
            case BOOLEAN:
                return node.asBoolean();
            case INTEGER:
                return node.asInt();
            case BIGINT:
                return node.asLong();
            case DOUBLE:
                return node.asDouble();
            case FLOAT:
                return (float) node.asDouble();
            default:
                return StringData.fromString(node.asText());
        }
    }
}

5. Sink 連接器實現(xiàn)

5.1 Sink Factory

package com.example.flink.http;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;

public class HttpDynamicTableSinkFactory implements DynamicTableSinkFactory {
    
    @Override
    public String factoryIdentifier() {
        return HttpOptions.IDENTIFIER;
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HttpOptions.URL);
        return options;
    }
    
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HttpOptions.METHOD);
        options.add(HttpOptions.AUTH_TOKEN);
        options.add(HttpOptions.MAX_RETRIES);
        return options;
    }
    
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = 
            FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        
        return new HttpDynamicTableSink(
            context.getCatalogTable().getResolvedSchema(),
            helper.getOptions()
        );
    }
}

5.2 Dynamic Table Sink

package com.example.flink.http;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;

public class HttpDynamicTableSink implements DynamicTableSink {
    
    private final ResolvedSchema schema;
    private final ReadableConfig config;
    
    public HttpDynamicTableSink(ResolvedSchema schema, ReadableConfig config) {
        this.schema = schema;
        this.config = config;
    }
    
    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return ChangelogMode.insertOnly();
    }
    
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        SinkFunction<RowData> sinkFunction = 
            new HttpSinkFunction(schema, config);
        return SinkFunctionProvider.of(sinkFunction);
    }
    
    @Override
    public DynamicTableSink copy() {
        return new HttpDynamicTableSink(schema, config);
    }
    
    @Override
    public String asSummaryString() {
        return "HTTP Table Sink";
    }
}

5.3 Sink Function

package com.example.flink.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpSinkFunction extends RichSinkFunction<RowData> {
    
    private static final Logger LOG = LoggerFactory.getLogger(HttpSinkFunction.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();
    
    private final ResolvedSchema schema;
    private final ReadableConfig config;
    private CloseableHttpClient httpClient;
    
    public HttpSinkFunction(ResolvedSchema schema, ReadableConfig config) {
        this.schema = schema;
        this.config = config;
    }
    
    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) {
        httpClient = HttpClients.createDefault();
        LOG.info("HTTP Sink opened");
    }
    
    @Override
    public void invoke(RowData value, Context context) throws Exception {
        String url = config.get(HttpOptions.URL);
        
        // 轉(zhuǎn)換 RowData 為 JSON
        ObjectNode json = rowDataToJson(value);
        String jsonString = MAPPER.writeValueAsString(json);
        
        // 發(fā)送 HTTP POST 請求
        sendData(url, jsonString);
    }
    
    @Override
    public void close() throws Exception {
        if (httpClient != null) {
            httpClient.close();
        }
    }
    
    /**
     * 將 RowData 轉(zhuǎn)換為 JSON
     */
    private ObjectNode rowDataToJson(RowData row) {
        ObjectNode json = MAPPER.createObjectNode();
        
        for (int i = 0; i < schema.getColumnCount(); i++) {
            String fieldName = schema.getColumnNames().get(i);
            LogicalType type = schema.getColumnDataTypes().get(i).getLogicalType();
            
            if (row.isNullAt(i)) {
                json.putNull(fieldName);
                continue;
            }
            
            switch (type.getTypeRoot()) {
                case VARCHAR:
                case CHAR:
                    json.put(fieldName, row.getString(i).toString());
                    break;
                case BOOLEAN:
                    json.put(fieldName, row.getBoolean(i));
                    break;
                case INTEGER:
                    json.put(fieldName, row.getInt(i));
                    break;
                case BIGINT:
                    json.put(fieldName, row.getLong(i));
                    break;
                case DOUBLE:
                    json.put(fieldName, row.getDouble(i));
                    break;
                case FLOAT:
                    json.put(fieldName, row.getFloat(i));
                    break;
                default:
                    json.put(fieldName, row.getString(i).toString());
            }
        }
        
        return json;
    }
    
    /**
     * 發(fā)送數(shù)據(jù)到 HTTP 端點
     */
    private void sendData(String url, String jsonData) {
        int maxRetries = config.get(HttpOptions.MAX_RETRIES);
        
        for (int retry = 0; retry <= maxRetries; retry++) {
            try {
                HttpPost request = new HttpPost(url);
                request.setHeader("Content-Type", "application/json");
                
                // 設(shè)置認(rèn)證
                config.getOptional(HttpOptions.AUTH_TOKEN).ifPresent(token ->
                    request.setHeader("Authorization", "Bearer " + token)
                );
                
                request.setEntity(new StringEntity(jsonData));
                
                try (CloseableHttpResponse response = httpClient.execute(request)) {
                    if (response.getCode() >= 200 && response.getCode() < 300) {
                        LOG.debug("Data sent successfully");
                        return;
                    } else {
                        LOG.warn("HTTP request failed: {}", response.getCode());
                    }
                }
            } catch (Exception e) {
                LOG.error("Error sending data (attempt {}/{})", retry + 1, maxRetries + 1, e);
                if (retry < maxRetries) {
                    try {
                        Thread.sleep(1000 * (retry + 1));
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
}

6. SPI 服務(wù)注冊

6.1 創(chuàng)建服務(wù)配置文件

src/main/resources/META-INF/services/ 目錄下創(chuàng)建文件:

文件名: org.apache.flink.table.factories.Factory

文件內(nèi)容:

com.example.flink.http.HttpDynamicTableSourceFactory
com.example.flink.http.HttpDynamicTableSinkFactory

這是 Java SPI 機制,F(xiàn)link 會自動發(fā)現(xiàn)并注冊你的連接器。


7. 完整使用示例

7.1 模擬 HTTP API 服務(wù)

首先創(chuàng)建一個簡單的 HTTP 服務(wù)用于測試:

# http_api_server.py
from flask import Flask, request, jsonify
import random
from datetime import datetime

app = Flask(__name__)

# 模擬訂單數(shù)據(jù)
orders = []

@app.route('/api/orders', methods=['GET'])
def get_orders():
    """返回訂單列表"""
    # 生成一些測試數(shù)據(jù)
    new_orders = [
        {
            'order_id': f'ORD{i}',
            'user_id': random.randint(1000, 1005),
            'product_name': random.choice(['手機', '電腦', '耳機']),
            'amount': round(random.uniform(100, 5000), 2),
            'order_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        for i in range(random.randint(1, 5))
    ]
    
    return jsonify({
        'code': 200,
        'data': new_orders,
        'message': 'success'
    })

@app.route('/api/webhook', methods=['POST'])
def webhook():
    """接收 Flink 發(fā)送的數(shù)據(jù)"""
    data = request.json
    print(f"收到數(shù)據(jù): {data}")
    orders.append(data)
    return jsonify({'code': 200, 'message': 'received'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

啟動服務(wù):

pip install flask
python http_api_server.py

7.2 Flink SQL 使用示例

package com.example.flink.demo;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class HttpConnectorDemo {
    
    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        // 2. 創(chuàng)建 HTTP Source 表
        tableEnv.executeSql(
            "CREATE TABLE http_orders (" +
            "  order_id STRING," +
            "  user_id BIGINT," +
            "  product_name STRING," +
            "  amount DOUBLE," +
            "  order_time STRING" +
            ") WITH (" +
            "  'connector' = 'http'," +
            "  'url' = 'http://localhost:5000/api/orders'," +
            "  'method' = 'GET'," +
            "  'poll.interval' = '5s'," +
            "  'json.path' = 'data'" +  // 從響應(yīng)的 data 字段提取數(shù)組
            ")"
        );
        
        // 3. 創(chuàng)建 HTTP Sink 表(Webhook)
        tableEnv.executeSql(
            "CREATE TABLE http_webhook (" +
            "  user_id BIGINT," +
            "  total_amount DOUBLE," +
            "  order_count BIGINT" +
            ") WITH (" +
            "  'connector' = 'http'," +
            "  'url' = 'http://localhost:5000/api/webhook'," +
            "  'method' = 'POST'," +
            "  'max.retries' = '3'" +
            ")"
        );
        
        // 4. 查詢并打印數(shù)據(jù)(測試)
        tableEnv.executeSql("SELECT * FROM http_orders").print();
        
        // 5. 實時統(tǒng)計并發(fā)送到 Webhook
        tableEnv.executeSql(
            "INSERT INTO http_webhook " +
            "SELECT " +
            "  user_id," +
            "  SUM(amount) AS total_amount," +
            "  COUNT(*) AS order_count " +
            "FROM http_orders " +
            "GROUP BY user_id"
        );
        
        env.execute("HTTP Connector Demo");
    }
}

7.3 純 SQL 方式使用

-- 創(chuàng)建 HTTP 源表
CREATE TABLE http_api_source (
    order_id STRING,
    user_id BIGINT,
    product_name STRING,
    amount DOUBLE,
    order_time TIMESTAMP(3)
) WITH (
    'connector' = 'http',
    'url' = 'http://localhost:5000/api/orders',
    'method' = 'GET',
    'poll.interval' = '10s',
    'json.path' = 'data',
    'max.retries' = '3'
);

-- 創(chuàng)建 HTTP Sink 表
CREATE TABLE http_result_sink (
    user_id BIGINT,
    total_amount DOUBLE,
    order_count BIGINT,
    update_time TIMESTAMP(3)
) WITH (
    'connector' = 'http',
    'url' = 'http://localhost:5000/api/webhook',
    'method' = 'POST'
);

-- 實時統(tǒng)計并寫入 Webhook
INSERT INTO http_result_sink
SELECT 
    user_id,
    SUM(amount) AS total_amount,
    COUNT(*) AS order_count,
    CURRENT_TIMESTAMP AS update_time
FROM http_api_source
GROUP BY user_id;

7.4 帶認(rèn)證的示例

-- 使用 Bearer Token 認(rèn)證
CREATE TABLE secure_http_source (
    id BIGINT,
    name STRING,
    value DOUBLE
) WITH (
    'connector' = 'http',
    'url' = 'https://api.example.com/data',
    'method' = 'GET',
    'auth.token' = 'your-bearer-token-here',
    'poll.interval' = '30s'
);

8. 測試與調(diào)試

8.1 單元測試

package com.example.flink.http;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.jupiter.api.Test;

public class HttpConnectorTest {
    
    @Test
    public void testHttpSource() throws Exception {
        StreamExecutionEnvironment env = 
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        // 創(chuàng)建表
        tableEnv.executeSql(
            "CREATE TABLE test_http (" +
            "  id INT," +
            "  name STRING" +
            ") WITH (" +
            "  'connector' = 'http'," +
            "  'url' = 'http://localhost:5000/api/test'," +
            "  'poll.interval' = '5s'" +
            ")"
        );
        
        // 查詢測試
        tableEnv.executeSql("SELECT * FROM test_http LIMIT 10").print();
    }
}

8.2 日志調(diào)試

log4j.properties 中啟用調(diào)試日志:

log4j.logger.com.example.flink.http=DEBUG

8.3 常見問題排查

問題 1: 連接器未找到

Caused by: org.apache.flink.table.api.ValidationException: 
Could not find any factory for identifier 'http'

解決方案:

  • 檢查 SPI 配置文件是否正確
  • 確認(rèn) JAR 包已添加到 Flink lib 目錄
  • 驗證工廠類的 factoryIdentifier() 返回值

問題 2: 數(shù)據(jù)無法解析

Error converting JSON to RowData

解決方案:

  • 檢查 JSON 結(jié)構(gòu)是否匹配表定義
  • 使用 json.path 正確提取數(shù)據(jù)
  • 驗證字段類型映射

9. 最佳實踐

9.1 性能優(yōu)化

? 批量處理

// 在 Sink 中累積數(shù)據(jù)批量發(fā)送
private List<RowData> buffer = new ArrayList<>();
private static final int BATCH_SIZE = 100;

@Override
public void invoke(RowData value, Context context) {
    buffer.add(value);
    if (buffer.size() >= BATCH_SIZE) {
        flushBuffer();
    }
}

? 連接池復(fù)用

// 使用連接池管理 HTTP 客戶端
private static final PoolingHttpClientConnectionManager connManager = 
    new PoolingHttpClientConnectionManager();

static {
    connManager.setMaxTotal(200);
    connManager.setDefaultMaxPerRoute(20);
}

? 異步請求

// 使用異步 HTTP 客戶端提高吞吐量
CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault();

9.2 容錯處理

? 重試機制

// 指數(shù)退避重試
for (int retry = 0; retry <= maxRetries; retry++) {
    try {
        // 執(zhí)行請求
        return executeRequest();
    } catch (Exception e) {
        if (retry < maxRetries) {
            Thread.sleep((long) Math.pow(2, retry) * 1000);
        } else {
            throw e;
        }
    }
}

? 超時設(shè)置

RequestConfig requestConfig = RequestConfig.custom()
    .setConnectTimeout(Timeout.ofSeconds(10))
    .setResponseTimeout(Timeout.ofSeconds(30))
    .build();

9.3 監(jiān)控指標(biāo)

// 添加自定義 Metrics
public class HttpSourceFunction extends RichSourceFunction<RowData> {
    
    private transient Counter requestCounter;
    private transient Meter errorMeter;
    
    @Override
    public void open(Configuration parameters) {
        requestCounter = getRuntimeContext()
            .getMetricGroup()
            .counter("http_requests");
            
        errorMeter = getRuntimeContext()
            .getMetricGroup()
            .meter("http_errors", new MeterView(60));
    }
    
    private void executeRequest() {
        requestCounter.inc();
        try {
            // HTTP 請求
        } catch (Exception e) {
            errorMeter.markEvent();
            throw e;
        }
    }
}

10. 總結(jié)

本教程完整講解了 Flink 1.20 自定義 SQL 連接器的開發(fā):

? 核心組件:Factory、DynamicTableSource/Sink、SourceFunction/SinkFunction
? 完整實現(xiàn):HTTP 連接器的 Source 和 Sink
? 實戰(zhàn)示例:Flask API + Flink SQL 完整演示
? 最佳實踐:性能優(yōu)化、容錯處理、監(jiān)控指標(biāo)

通過這個 HTTP 連接器示例,你可以舉一反三,開發(fā)其他自定義連接器,如:

  • WebSocket 連接器
  • MongoDB 連接器
  • Redis 連接器
  • 企業(yè)內(nèi)部系統(tǒng)連接器

參考資源

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容