Flink 1.20 自定義 SQL 連接器詳細(xì)教程 - HTTP 連接器完整實現(xiàn)
?? 完整講解 Apache Flink 1.20 自定義 Table/SQL 連接器開發(fā),包含完整 HTTP 連接器源碼和示例
目錄
- 1. 概述
- 2. 核心架構(gòu)
- 3. Maven 項目配置
- 4. Source 連接器實現(xiàn)
- 5. Sink 連接器實現(xiàn)
- 6. SPI 服務(wù)注冊
- 7. 完整使用示例
- 8. 測試與調(diào)試
- 9. 最佳實踐
1. 概述
1.1 什么是自定義連接器
Flink 連接器是 Flink 與外部系統(tǒng)的橋梁:
- Source(源):從外部系統(tǒng)讀取數(shù)據(jù)
- Sink(匯):向外部系統(tǒng)寫入數(shù)據(jù)
1.2 HTTP 連接器應(yīng)用場景
- ? 調(diào)用 RESTful API 獲取實時數(shù)據(jù)
- ? 輪詢 HTTP 接口獲取更新
- ? 將處理結(jié)果 POST 到 Webhook
- ? 對接第三方數(shù)據(jù)服務(wù)
2. 核心架構(gòu)
2.1 組件關(guān)系圖
SQL/Table API
↓
Factory (工廠類)
↓
DynamicTableSource/Sink
↓
SourceFunction/SinkFunction
↓
外部 HTTP API
2.2 核心接口
| 接口 | 作用 |
|---|---|
DynamicTableSourceFactory |
創(chuàng)建 Source |
DynamicTableSinkFactory |
創(chuàng)建 Sink |
ScanTableSource |
掃描表源 |
RichSourceFunction<RowData> |
讀取數(shù)據(jù)邏輯 |
RichSinkFunction<RowData> |
寫入數(shù)據(jù)邏輯 |
3. Maven 項目配置
3.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.flink</groupId>
<artifactId>flink-connector-http</artifactId>
<version>1.0.0</version>
<properties>
<flink.version>1.20.0</flink.version>
<java.version>11</java.version>
</properties>
<dependencies>
<!-- Flink Table API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- HTTP 客戶端 -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.1</version>
</dependency>
<!-- JSON 處理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
</project>
4. Source 連接器實現(xiàn)
4.1 配置選項類
package com.example.flink.http;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import java.time.Duration;
public class HttpOptions {
public static final String IDENTIFIER = "http";
public static final ConfigOption<String> URL =
ConfigOptions.key("url")
.stringType()
.noDefaultValue()
.withDescription("HTTP endpoint URL");
public static final ConfigOption<String> METHOD =
ConfigOptions.key("method")
.stringType()
.defaultValue("GET")
.withDescription("HTTP method");
public static final ConfigOption<String> AUTH_TOKEN =
ConfigOptions.key("auth.token")
.stringType()
.noDefaultValue()
.withDescription("Bearer token");
public static final ConfigOption<Duration> POLL_INTERVAL =
ConfigOptions.key("poll.interval")
.durationType()
.defaultValue(Duration.ofSeconds(5))
.withDescription("Polling interval");
public static final ConfigOption<String> JSON_PATH =
ConfigOptions.key("json.path")
.stringType()
.noDefaultValue()
.withDescription("JSON path to extract data");
public static final ConfigOption<Integer> MAX_RETRIES =
ConfigOptions.key("max.retries")
.intType()
.defaultValue(3)
.withDescription("Max retry attempts");
}
4.2 Source Factory
package com.example.flink.http;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
public class HttpDynamicTableSourceFactory implements DynamicTableSourceFactory {
@Override
public String factoryIdentifier() {
return HttpOptions.IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HttpOptions.URL);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HttpOptions.METHOD);
options.add(HttpOptions.AUTH_TOKEN);
options.add(HttpOptions.POLL_INTERVAL);
options.add(HttpOptions.JSON_PATH);
options.add(HttpOptions.MAX_RETRIES);
return options;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig config = helper.getOptions();
return new HttpDynamicTableSource(
context.getCatalogTable().getResolvedSchema(),
config
);
}
}
4.3 Dynamic Table Source
package com.example.flink.http;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
public class HttpDynamicTableSource implements ScanTableSource {
private final ResolvedSchema schema;
private final ReadableConfig config;
public HttpDynamicTableSource(ResolvedSchema schema, ReadableConfig config) {
this.schema = schema;
this.config = config;
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
SourceFunction<RowData> sourceFunction =
new HttpSourceFunction(schema, config);
return SourceFunctionProvider.of(sourceFunction, false);
}
@Override
public DynamicTableSource copy() {
return new HttpDynamicTableSource(schema, config);
}
@Override
public String asSummaryString() {
return "HTTP Table Source";
}
}
4.4 Source Function(核心邏輯)
package com.example.flink.http;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
public class HttpSourceFunction extends RichSourceFunction<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(HttpSourceFunction.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private final ResolvedSchema schema;
private final ReadableConfig config;
private volatile boolean running = true;
private CloseableHttpClient httpClient;
public HttpSourceFunction(ResolvedSchema schema, ReadableConfig config) {
this.schema = schema;
this.config = config;
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) {
httpClient = HttpClients.createDefault();
LOG.info("HTTP Source opened");
}
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
String url = config.get(HttpOptions.URL);
Duration pollInterval = config.get(HttpOptions.POLL_INTERVAL);
String jsonPath = config.getOptional(HttpOptions.JSON_PATH).orElse(null);
LOG.info("Start polling from: {}", url);
while (running) {
try {
// 執(zhí)行 HTTP GET 請求
String response = fetchData(url);
if (response != null) {
// 解析 JSON
JsonNode root = MAPPER.readTree(response);
JsonNode dataNode = extractDataNode(root, jsonPath);
// 轉(zhuǎn)換為 RowData 并發(fā)送
if (dataNode != null) {
if (dataNode.isArray()) {
for (JsonNode item : dataNode) {
RowData row = jsonToRowData(item);
if (row != null) {
ctx.collect(row);
}
}
} else {
RowData row = jsonToRowData(dataNode);
if (row != null) {
ctx.collect(row);
}
}
}
}
Thread.sleep(pollInterval.toMillis());
} catch (InterruptedException e) {
LOG.info("HTTP source interrupted");
break;
} catch (Exception e) {
LOG.error("Error fetching HTTP data", e);
Thread.sleep(pollInterval.toMillis());
}
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void close() throws Exception {
if (httpClient != null) {
httpClient.close();
}
}
/**
* 從 HTTP 端點獲取數(shù)據(jù)
*/
private String fetchData(String url) {
try {
HttpGet request = new HttpGet(url);
// 設(shè)置認(rèn)證 Token
config.getOptional(HttpOptions.AUTH_TOKEN).ifPresent(token ->
request.setHeader("Authorization", "Bearer " + token)
);
try (CloseableHttpResponse response = httpClient.execute(request)) {
if (response.getCode() >= 200 && response.getCode() < 300) {
return EntityUtils.toString(response.getEntity());
} else {
LOG.warn("HTTP request failed: {}", response.getCode());
}
}
} catch (Exception e) {
LOG.error("HTTP request error", e);
}
return null;
}
/**
* 從 JSON 響應(yīng)中提取數(shù)據(jù)節(jié)點
*/
private JsonNode extractDataNode(JsonNode root, String jsonPath) {
if (jsonPath == null || jsonPath.isEmpty()) {
return root;
}
JsonNode current = root;
for (String part : jsonPath.split("\\.")) {
current = current.get(part);
if (current == null) {
LOG.warn("JSON path not found: {}", jsonPath);
return null;
}
}
return current;
}
/**
* 將 JSON 轉(zhuǎn)換為 RowData
*/
private RowData jsonToRowData(JsonNode json) {
try {
int fieldCount = schema.getColumnCount();
GenericRowData row = new GenericRowData(fieldCount);
for (int i = 0; i < fieldCount; i++) {
String fieldName = schema.getColumnNames().get(i);
LogicalType type = schema.getColumnDataTypes().get(i).getLogicalType();
JsonNode fieldNode = json.get(fieldName);
if (fieldNode == null || fieldNode.isNull()) {
row.setField(i, null);
} else {
row.setField(i, convertValue(fieldNode, type));
}
}
return row;
} catch (Exception e) {
LOG.error("Error converting JSON to RowData", e);
return null;
}
}
/**
* 類型轉(zhuǎn)換
*/
private Object convertValue(JsonNode node, LogicalType type) {
LogicalTypeRoot typeRoot = type.getTypeRoot();
switch (typeRoot) {
case VARCHAR:
case CHAR:
return StringData.fromString(node.asText());
case BOOLEAN:
return node.asBoolean();
case INTEGER:
return node.asInt();
case BIGINT:
return node.asLong();
case DOUBLE:
return node.asDouble();
case FLOAT:
return (float) node.asDouble();
default:
return StringData.fromString(node.asText());
}
}
}
5. Sink 連接器實現(xiàn)
5.1 Sink Factory
package com.example.flink.http;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
public class HttpDynamicTableSinkFactory implements DynamicTableSinkFactory {
@Override
public String factoryIdentifier() {
return HttpOptions.IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HttpOptions.URL);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HttpOptions.METHOD);
options.add(HttpOptions.AUTH_TOKEN);
options.add(HttpOptions.MAX_RETRIES);
return options;
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
return new HttpDynamicTableSink(
context.getCatalogTable().getResolvedSchema(),
helper.getOptions()
);
}
}
5.2 Dynamic Table Sink
package com.example.flink.http;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
public class HttpDynamicTableSink implements DynamicTableSink {
private final ResolvedSchema schema;
private final ReadableConfig config;
public HttpDynamicTableSink(ResolvedSchema schema, ReadableConfig config) {
this.schema = schema;
this.config = config;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return ChangelogMode.insertOnly();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
SinkFunction<RowData> sinkFunction =
new HttpSinkFunction(schema, config);
return SinkFunctionProvider.of(sinkFunction);
}
@Override
public DynamicTableSink copy() {
return new HttpDynamicTableSink(schema, config);
}
@Override
public String asSummaryString() {
return "HTTP Table Sink";
}
}
5.3 Sink Function
package com.example.flink.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpSinkFunction extends RichSinkFunction<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(HttpSinkFunction.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private final ResolvedSchema schema;
private final ReadableConfig config;
private CloseableHttpClient httpClient;
public HttpSinkFunction(ResolvedSchema schema, ReadableConfig config) {
this.schema = schema;
this.config = config;
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) {
httpClient = HttpClients.createDefault();
LOG.info("HTTP Sink opened");
}
@Override
public void invoke(RowData value, Context context) throws Exception {
String url = config.get(HttpOptions.URL);
// 轉(zhuǎn)換 RowData 為 JSON
ObjectNode json = rowDataToJson(value);
String jsonString = MAPPER.writeValueAsString(json);
// 發(fā)送 HTTP POST 請求
sendData(url, jsonString);
}
@Override
public void close() throws Exception {
if (httpClient != null) {
httpClient.close();
}
}
/**
* 將 RowData 轉(zhuǎn)換為 JSON
*/
private ObjectNode rowDataToJson(RowData row) {
ObjectNode json = MAPPER.createObjectNode();
for (int i = 0; i < schema.getColumnCount(); i++) {
String fieldName = schema.getColumnNames().get(i);
LogicalType type = schema.getColumnDataTypes().get(i).getLogicalType();
if (row.isNullAt(i)) {
json.putNull(fieldName);
continue;
}
switch (type.getTypeRoot()) {
case VARCHAR:
case CHAR:
json.put(fieldName, row.getString(i).toString());
break;
case BOOLEAN:
json.put(fieldName, row.getBoolean(i));
break;
case INTEGER:
json.put(fieldName, row.getInt(i));
break;
case BIGINT:
json.put(fieldName, row.getLong(i));
break;
case DOUBLE:
json.put(fieldName, row.getDouble(i));
break;
case FLOAT:
json.put(fieldName, row.getFloat(i));
break;
default:
json.put(fieldName, row.getString(i).toString());
}
}
return json;
}
/**
* 發(fā)送數(shù)據(jù)到 HTTP 端點
*/
private void sendData(String url, String jsonData) {
int maxRetries = config.get(HttpOptions.MAX_RETRIES);
for (int retry = 0; retry <= maxRetries; retry++) {
try {
HttpPost request = new HttpPost(url);
request.setHeader("Content-Type", "application/json");
// 設(shè)置認(rèn)證
config.getOptional(HttpOptions.AUTH_TOKEN).ifPresent(token ->
request.setHeader("Authorization", "Bearer " + token)
);
request.setEntity(new StringEntity(jsonData));
try (CloseableHttpResponse response = httpClient.execute(request)) {
if (response.getCode() >= 200 && response.getCode() < 300) {
LOG.debug("Data sent successfully");
return;
} else {
LOG.warn("HTTP request failed: {}", response.getCode());
}
}
} catch (Exception e) {
LOG.error("Error sending data (attempt {}/{})", retry + 1, maxRetries + 1, e);
if (retry < maxRetries) {
try {
Thread.sleep(1000 * (retry + 1));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
}
6. SPI 服務(wù)注冊
6.1 創(chuàng)建服務(wù)配置文件
在 src/main/resources/META-INF/services/ 目錄下創(chuàng)建文件:
文件名: org.apache.flink.table.factories.Factory
文件內(nèi)容:
com.example.flink.http.HttpDynamicTableSourceFactory
com.example.flink.http.HttpDynamicTableSinkFactory
這是 Java SPI 機制,F(xiàn)link 會自動發(fā)現(xiàn)并注冊你的連接器。
7. 完整使用示例
7.1 模擬 HTTP API 服務(wù)
首先創(chuàng)建一個簡單的 HTTP 服務(wù)用于測試:
# http_api_server.py
from flask import Flask, request, jsonify
import random
from datetime import datetime
app = Flask(__name__)
# 模擬訂單數(shù)據(jù)
orders = []
@app.route('/api/orders', methods=['GET'])
def get_orders():
"""返回訂單列表"""
# 生成一些測試數(shù)據(jù)
new_orders = [
{
'order_id': f'ORD{i}',
'user_id': random.randint(1000, 1005),
'product_name': random.choice(['手機', '電腦', '耳機']),
'amount': round(random.uniform(100, 5000), 2),
'order_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
for i in range(random.randint(1, 5))
]
return jsonify({
'code': 200,
'data': new_orders,
'message': 'success'
})
@app.route('/api/webhook', methods=['POST'])
def webhook():
"""接收 Flink 發(fā)送的數(shù)據(jù)"""
data = request.json
print(f"收到數(shù)據(jù): {data}")
orders.append(data)
return jsonify({'code': 200, 'message': 'received'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
啟動服務(wù):
pip install flask
python http_api_server.py
7.2 Flink SQL 使用示例
package com.example.flink.demo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class HttpConnectorDemo {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 創(chuàng)建 HTTP Source 表
tableEnv.executeSql(
"CREATE TABLE http_orders (" +
" order_id STRING," +
" user_id BIGINT," +
" product_name STRING," +
" amount DOUBLE," +
" order_time STRING" +
") WITH (" +
" 'connector' = 'http'," +
" 'url' = 'http://localhost:5000/api/orders'," +
" 'method' = 'GET'," +
" 'poll.interval' = '5s'," +
" 'json.path' = 'data'" + // 從響應(yīng)的 data 字段提取數(shù)組
")"
);
// 3. 創(chuàng)建 HTTP Sink 表(Webhook)
tableEnv.executeSql(
"CREATE TABLE http_webhook (" +
" user_id BIGINT," +
" total_amount DOUBLE," +
" order_count BIGINT" +
") WITH (" +
" 'connector' = 'http'," +
" 'url' = 'http://localhost:5000/api/webhook'," +
" 'method' = 'POST'," +
" 'max.retries' = '3'" +
")"
);
// 4. 查詢并打印數(shù)據(jù)(測試)
tableEnv.executeSql("SELECT * FROM http_orders").print();
// 5. 實時統(tǒng)計并發(fā)送到 Webhook
tableEnv.executeSql(
"INSERT INTO http_webhook " +
"SELECT " +
" user_id," +
" SUM(amount) AS total_amount," +
" COUNT(*) AS order_count " +
"FROM http_orders " +
"GROUP BY user_id"
);
env.execute("HTTP Connector Demo");
}
}
7.3 純 SQL 方式使用
-- 創(chuàng)建 HTTP 源表
CREATE TABLE http_api_source (
order_id STRING,
user_id BIGINT,
product_name STRING,
amount DOUBLE,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'http',
'url' = 'http://localhost:5000/api/orders',
'method' = 'GET',
'poll.interval' = '10s',
'json.path' = 'data',
'max.retries' = '3'
);
-- 創(chuàng)建 HTTP Sink 表
CREATE TABLE http_result_sink (
user_id BIGINT,
total_amount DOUBLE,
order_count BIGINT,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'http',
'url' = 'http://localhost:5000/api/webhook',
'method' = 'POST'
);
-- 實時統(tǒng)計并寫入 Webhook
INSERT INTO http_result_sink
SELECT
user_id,
SUM(amount) AS total_amount,
COUNT(*) AS order_count,
CURRENT_TIMESTAMP AS update_time
FROM http_api_source
GROUP BY user_id;
7.4 帶認(rèn)證的示例
-- 使用 Bearer Token 認(rèn)證
CREATE TABLE secure_http_source (
id BIGINT,
name STRING,
value DOUBLE
) WITH (
'connector' = 'http',
'url' = 'https://api.example.com/data',
'method' = 'GET',
'auth.token' = 'your-bearer-token-here',
'poll.interval' = '30s'
);
8. 測試與調(diào)試
8.1 單元測試
package com.example.flink.http;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.jupiter.api.Test;
public class HttpConnectorTest {
@Test
public void testHttpSource() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 創(chuàng)建表
tableEnv.executeSql(
"CREATE TABLE test_http (" +
" id INT," +
" name STRING" +
") WITH (" +
" 'connector' = 'http'," +
" 'url' = 'http://localhost:5000/api/test'," +
" 'poll.interval' = '5s'" +
")"
);
// 查詢測試
tableEnv.executeSql("SELECT * FROM test_http LIMIT 10").print();
}
}
8.2 日志調(diào)試
在 log4j.properties 中啟用調(diào)試日志:
log4j.logger.com.example.flink.http=DEBUG
8.3 常見問題排查
問題 1: 連接器未找到
Caused by: org.apache.flink.table.api.ValidationException:
Could not find any factory for identifier 'http'
解決方案:
- 檢查 SPI 配置文件是否正確
- 確認(rèn) JAR 包已添加到 Flink lib 目錄
- 驗證工廠類的
factoryIdentifier()返回值
問題 2: 數(shù)據(jù)無法解析
Error converting JSON to RowData
解決方案:
- 檢查 JSON 結(jié)構(gòu)是否匹配表定義
- 使用
json.path正確提取數(shù)據(jù) - 驗證字段類型映射
9. 最佳實踐
9.1 性能優(yōu)化
? 批量處理
// 在 Sink 中累積數(shù)據(jù)批量發(fā)送
private List<RowData> buffer = new ArrayList<>();
private static final int BATCH_SIZE = 100;
@Override
public void invoke(RowData value, Context context) {
buffer.add(value);
if (buffer.size() >= BATCH_SIZE) {
flushBuffer();
}
}
? 連接池復(fù)用
// 使用連接池管理 HTTP 客戶端
private static final PoolingHttpClientConnectionManager connManager =
new PoolingHttpClientConnectionManager();
static {
connManager.setMaxTotal(200);
connManager.setDefaultMaxPerRoute(20);
}
? 異步請求
// 使用異步 HTTP 客戶端提高吞吐量
CloseableHttpAsyncClient asyncClient = HttpAsyncClients.createDefault();
9.2 容錯處理
? 重試機制
// 指數(shù)退避重試
for (int retry = 0; retry <= maxRetries; retry++) {
try {
// 執(zhí)行請求
return executeRequest();
} catch (Exception e) {
if (retry < maxRetries) {
Thread.sleep((long) Math.pow(2, retry) * 1000);
} else {
throw e;
}
}
}
? 超時設(shè)置
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(Timeout.ofSeconds(10))
.setResponseTimeout(Timeout.ofSeconds(30))
.build();
9.3 監(jiān)控指標(biāo)
// 添加自定義 Metrics
public class HttpSourceFunction extends RichSourceFunction<RowData> {
private transient Counter requestCounter;
private transient Meter errorMeter;
@Override
public void open(Configuration parameters) {
requestCounter = getRuntimeContext()
.getMetricGroup()
.counter("http_requests");
errorMeter = getRuntimeContext()
.getMetricGroup()
.meter("http_errors", new MeterView(60));
}
private void executeRequest() {
requestCounter.inc();
try {
// HTTP 請求
} catch (Exception e) {
errorMeter.markEvent();
throw e;
}
}
}
10. 總結(jié)
本教程完整講解了 Flink 1.20 自定義 SQL 連接器的開發(fā):
? 核心組件:Factory、DynamicTableSource/Sink、SourceFunction/SinkFunction
? 完整實現(xiàn):HTTP 連接器的 Source 和 Sink
? 實戰(zhàn)示例:Flask API + Flink SQL 完整演示
? 最佳實踐:性能優(yōu)化、容錯處理、監(jiān)控指標(biāo)
通過這個 HTTP 連接器示例,你可以舉一反三,開發(fā)其他自定義連接器,如:
- WebSocket 連接器
- MongoDB 連接器
- Redis 連接器
- 企業(yè)內(nèi)部系統(tǒng)連接器