Flink 2.x 實(shí)時(shí) Data + AI

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

前言

Flink 2.1.0新增了AI功能,標(biāo)志著實(shí)時(shí)數(shù)據(jù)引擎向統(tǒng)一 Data + AI 的里程碑式跨越。

目前Flink 2.1.x版本實(shí)現(xiàn)了基于OpenAI接口的流式推理任務(wù)。支持使用函數(shù)調(diào)用LLM,實(shí)現(xiàn)chat_competion功能。

本篇為大家?guī)?Flink + AI 功能的使用演示。

環(huán)境配置和使用

環(huán)境信息:

  • Flink 2.1.1
  • JDK17

Flink 2.x以上版本不再提供JDK 8 的支持,請各位用戶將JDK升級為JDK 17。

使用如下命令下載Flink,并解壓到任意目錄:

wget https://dlcdn.apache.org/flink/flink-2.1.1/flink-2.1.1-bin-scala_2.12.tgz

然后配置JDK 17 JAVA_HOME:

export JAVA_HOME=/path/to/jdk17

接下來下載依賴:Maven Repository: org.apache.flink ? flink-model-openai ? 2.1.1。放入Flink安裝目錄的lib中。

為了演示方便,建議使用Flink的Standalone模式運(yùn)行。配置方法如下:

  1. 編輯FLINK_HOMEconf/mastersconf/workers文件,指定需要啟動(dòng)的JobManager,TaskManager的位置(哪個(gè)host上)和個(gè)數(shù)。
  2. 執(zhí)行FLINK_HOME/bin/start-cluster.sh,啟動(dòng)集群。
  3. 集群啟動(dòng)成功之后,執(zhí)行FLINK_HOME/bin/sql-client.sh embedded,啟動(dòng)Flink SQL client。

注意:如果想要在Yarn上運(yùn)行Flink 2.x版本,因?yàn)楫?dāng)前Yarn只支持JDK 8,需要在JDK 8 的Yarn上運(yùn)行JDK 17的Flink。具體配置方法請參考Flink 使用之配置與調(diào)優(yōu)中的Flink on Yarn 模式配置JDK一節(jié)。

使用Flink SQL調(diào)用大語言模型的步驟:

  1. 創(chuàng)建Model。需要指明模型提供方和系統(tǒng)提示詞。
  2. 調(diào)用ML_PREDICT函數(shù)。指定Model和作為提示詞傳入的字段。

官網(wǎng)示例如下:

-- 創(chuàng)建Model
-- 修改endpoint,api-key和model與真實(shí)環(huán)境對應(yīng)后執(zhí)行
CREATE MODEL ai_analyze_sentiment
INPUT (`input` STRING)
OUTPUT (`content` STRING)
WITH (
    'provider'='openai',
    'endpoint'='http://ip:10000/v1/chat/completions',
    'api-key' = 'your-api-key',
    'model'='qwen3',
    'system-prompt' = 'Classify the text below into one of the following labels: [positive, negative, neutral, mixed]. Output only the label./no_think'
);

-- 示例數(shù)據(jù),一個(gè)只有一行數(shù)據(jù)的臨時(shí)view
-- 真實(shí)使用時(shí)可以是Kafka表,Hudi表等等
CREATE TEMPORARY VIEW movie_comment(id, movie_name,  user_comment, actual_label) AS VALUES
  (1, 'Good Stuff', 'The part where children guess the sounds is my favorite. It is a very romantic narrative compared to other movies I have seen. Very gentle and full of love.', 'positive');

-- 執(zhí)行推理,顯示推理結(jié)果
SELECT id, movie_name, content as predicit_label, actual_label
FROM ML_PREDICT(
  TABLE movie_comment,
  MODEL ai_analyze_sentiment,
  DESCRIPTOR(user_comment));

配置參數(shù)

以下參數(shù)摘自官網(wǎng)列出的常規(guī)配置參數(shù)。這些參數(shù)為接入LLM的常規(guī)配置項(xiàng)。不再一一解釋。

Common #

Option Required Default Type Description
provider required (none) String Specifies the model function provider to use, must be 'openai'.
endpoint required (none) String Full URL of the OpenAI API endpoint, e.g. https://api.openai.com/v1/chat/completions or https://api.openai.com/v1/embeddings.
api-key required (none) String OpenAI API key for authentication.
model required (none) String Model name, e.g. gpt-3.5-turbo, text-embedding-ada-002.

Chat Completions #

Option Required Default Type Description
system-prompt optional "You are a helpful assistant." String The input message for the system role.
temperature optional null Double Controls randomness of output, range [0.0, 1.0]. See temperature
top-p optional null Double Probability cutoff for token selection (used instead of temperature). See top_p
stop optional null String Stop sequences, comma-separated list. See stop
max-tokens optional null Long Maximum number of tokens to generate. See max tokens

此外,F(xiàn)link官方還提供了Async predict(異步推理支持)。能夠平衡吞吐量和調(diào)用延遲。

使用示例如下:

SELECT * FROM ML_PREDICT(TABLE input, MODEL mdl, descriptor(f1, f2), MAP['async', 'true']);

官方提供配置項(xiàng)主要是和請求的并行度,以及請求超時(shí)和失敗重試機(jī)制相關(guān)。示例配置項(xiàng)如下所示:

table.exec.async-ml-predict.max-concurrent-operations: 10
table.exec.async-ml-predict.timeout: 30s
table.exec.async-ml-predict.retry-strategy: FIXED_DELAY
table.exec.async-ml-predict.fixed-delay: 10s
table.exec.async-ml-predict.max-attempts: 3

參考文獻(xiàn):OpenAI | Apache Flink

社區(qū)未來規(guī)劃

FLIP-548: Introduce Tool and Agent in Flink SQL - Apache Flink - Apache Software Foundation:提供tools和agent管理調(diào)用功能。
FLIP-540: Support VECTOR_SEARCH in Flink SQL - Apache Flink - Apache Software Foundation:提供向量檢索功能。
FLIP-525: Model ML_PREDICT, ML_EVALUATE Implementation Design - Apache Flink - Apache Software Foundation:完善LM_PREDICT和ML_EVALUATE。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容