Flink 使用介紹相關(guān)文檔目錄
前言
Flink 2.1.0新增了AI功能,標(biāo)志著實(shí)時(shí)數(shù)據(jù)引擎向統(tǒng)一 Data + AI 的里程碑式跨越。
目前Flink 2.1.x版本實(shí)現(xiàn)了基于OpenAI接口的流式推理任務(wù)。支持使用函數(shù)調(diào)用LLM,實(shí)現(xiàn)chat_competion功能。
本篇為大家?guī)?Flink + AI 功能的使用演示。
環(huán)境配置和使用
環(huán)境信息:
- Flink 2.1.1
- JDK17
Flink 2.x以上版本不再提供JDK 8 的支持,請各位用戶將JDK升級為JDK 17。
使用如下命令下載Flink,并解壓到任意目錄:
wget https://dlcdn.apache.org/flink/flink-2.1.1/flink-2.1.1-bin-scala_2.12.tgz
然后配置JDK 17 JAVA_HOME:
export JAVA_HOME=/path/to/jdk17
接下來下載依賴:Maven Repository: org.apache.flink ? flink-model-openai ? 2.1.1。放入Flink安裝目錄的lib中。
為了演示方便,建議使用Flink的Standalone模式運(yùn)行。配置方法如下:
- 編輯
FLINK_HOME中conf/masters和conf/workers文件,指定需要啟動(dòng)的JobManager,TaskManager的位置(哪個(gè)host上)和個(gè)數(shù)。 - 執(zhí)行
FLINK_HOME/bin/start-cluster.sh,啟動(dòng)集群。 - 集群啟動(dòng)成功之后,執(zhí)行
FLINK_HOME/bin/sql-client.sh embedded,啟動(dòng)Flink SQL client。
注意:如果想要在Yarn上運(yùn)行Flink 2.x版本,因?yàn)楫?dāng)前Yarn只支持JDK 8,需要在JDK 8 的Yarn上運(yùn)行JDK 17的Flink。具體配置方法請參考Flink 使用之配置與調(diào)優(yōu)中的
Flink on Yarn 模式配置JDK一節(jié)。
使用Flink SQL調(diào)用大語言模型的步驟:
- 創(chuàng)建Model。需要指明模型提供方和系統(tǒng)提示詞。
- 調(diào)用ML_PREDICT函數(shù)。指定Model和作為提示詞傳入的字段。
官網(wǎng)示例如下:
-- 創(chuàng)建Model
-- 修改endpoint,api-key和model與真實(shí)環(huán)境對應(yīng)后執(zhí)行
CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
'provider'='openai',
'endpoint'='http://ip:10000/v1/chat/completions',
'api-key' = 'your-api-key',
'model'='qwen3',
'system-prompt' = 'Classify the text below into one of the following labels: [positive, negative, neutral, mixed]. Output only the label./no_think'
);
-- 示例數(shù)據(jù),一個(gè)只有一行數(shù)據(jù)的臨時(shí)view
-- 真實(shí)使用時(shí)可以是Kafka表,Hudi表等等
CREATE TEMPORARY VIEW movie_comment(id, movie_name, user_comment, actual_label) AS VALUES
(1, 'Good Stuff', 'The part where children guess the sounds is my favorite. It is a very romantic narrative compared to other movies I have seen. Very gentle and full of love.', 'positive');
-- 執(zhí)行推理,顯示推理結(jié)果
SELECT id, movie_name, content as predicit_label, actual_label
FROM ML_PREDICT(
TABLE movie_comment,
MODEL ai_analyze_sentiment,
DESCRIPTOR(user_comment));
配置參數(shù)
以下參數(shù)摘自官網(wǎng)列出的常規(guī)配置參數(shù)。這些參數(shù)為接入LLM的常規(guī)配置項(xiàng)。不再一一解釋。
Common #
Chat Completions #
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
| system-prompt | optional | "You are a helpful assistant." | String | The input message for the system role. |
| temperature | optional | null | Double | Controls randomness of output, range [0.0, 1.0]. See temperature
|
| top-p | optional | null | Double | Probability cutoff for token selection (used instead of temperature). See top_p |
| stop | optional | null | String | Stop sequences, comma-separated list. See stop |
| max-tokens | optional | null | Long | Maximum number of tokens to generate. See max tokens |
此外,F(xiàn)link官方還提供了Async predict(異步推理支持)。能夠平衡吞吐量和調(diào)用延遲。
使用示例如下:
SELECT * FROM ML_PREDICT(TABLE input, MODEL mdl, descriptor(f1, f2), MAP['async', 'true']);
官方提供配置項(xiàng)主要是和請求的并行度,以及請求超時(shí)和失敗重試機(jī)制相關(guān)。示例配置項(xiàng)如下所示:
table.exec.async-ml-predict.max-concurrent-operations: 10
table.exec.async-ml-predict.timeout: 30s
table.exec.async-ml-predict.retry-strategy: FIXED_DELAY
table.exec.async-ml-predict.fixed-delay: 10s
table.exec.async-ml-predict.max-attempts: 3
參考文獻(xiàn):OpenAI | Apache Flink
社區(qū)未來規(guī)劃
FLIP-548: Introduce Tool and Agent in Flink SQL - Apache Flink - Apache Software Foundation:提供tools和agent管理調(diào)用功能。
FLIP-540: Support VECTOR_SEARCH in Flink SQL - Apache Flink - Apache Software Foundation:提供向量檢索功能。
FLIP-525: Model ML_PREDICT, ML_EVALUATE Implementation Design - Apache Flink - Apache Software Foundation:完善LM_PREDICT和ML_EVALUATE。