Apache Pulsar 之 Go Function

作者 | 冉小龍

審校 | Anonymitaet

編輯 | Susan

閱讀本文需要約 8 分鐘。

image

- 導讀 -

在學習如何編寫、部署 Go Function 之前,先向大家介紹一下 Go Function 的實現(xiàn)思路。

一篇文章了解 Pulsar Functions 中我們提到,F(xiàn)unction 作為 Pulsar 的計算層,其實現(xiàn)思路主要有如下兩種:

  • SDK

  • Plugin

之前介紹的 Java FunctionPython Function 使用 Plugin 的形式實現(xiàn),本篇介紹的 Go Function 卻使用 SDK 的形式提供給用戶使用。為什么我們不與 Java Function 和 Python Function 采用相同的形式卻要在 Go Function 中另辟蹊徑,以 SDK 的形式提供給用戶使用呢?

Go Function 使用 SDK 的原因

1

Go 不支持動態(tài)反射。這是語言層面本身的問題,Go 是一門靜態(tài)類型的語言,雖然支持了反射的功能,但反射的支持相對較弱,不支持動態(tài)反射的功能。

這一點與 Java 和 Python 不同,Python 是動態(tài)類型的語言,動態(tài)加載本身就是它的強項所在。Java 雖然是靜態(tài)類型的語言,但支持 ClassLoader 可以實現(xiàn)類似動態(tài)反射的功能。

所以在 Go 中我們沒辦法直接將用戶的代碼邏輯動態(tài)加載到 Pulsar Functions 中來執(zhí)行。

2

Go Plugin 不成熟。Go 1.8 官方實現(xiàn)了 Go Plugin,支持動態(tài)加載的特性,但是并不成熟,在調(diào)研過程中發(fā)現(xiàn)的主要問題如下:

  • 如果注入了一些非法模塊,會帶來一定的安全隱患,如何防范它們?

  • 給系統(tǒng)帶來一些不穩(wěn)定因素。如果模塊出現(xiàn)問題,則會導致服務崩潰。

  • 它給版本管理帶來了一定的困難,特別是在微服務場景下,相同的服務,加載了不同的插件,如何做版本的管理?

  • 社區(qū)相對不成熟。

基于以上考量,我們使用了 SDK 的形式來做 Go Function 的實現(xiàn)。

編寫 Go Function

Go Function 使用 SDK 實現(xiàn),將 Function 的接口以 SDK 的形式對外暴露給用戶,在使用 Go Function 之前需要 import "github.com/apache/pulsar/pulsar-function-go/pf",使用方式具體如下:

import (
"context"
"fmt"

"github.com/apache/pulsar/pulsar-function-go/pf"

)

func HandleRequest(ctx context.Context, input []byte) error {
fmt.Println(string(input) + "!")
return nil
}

func main() {
pf.Start(HandleRequest)
}

在上述示例代碼中,將輸入的 input 加 ! 后打印,第一個參數(shù)為一個 context 對象,當用戶編寫的 Function 需要與 Go Function 進行交互時,可以加入該參數(shù),使用示例如下:

if fc, ok := pf.FromContext(ctx); ok {
fmt.Printf("function ID is:%s, ", fc.GetFuncID())
fmt.Printf("function version is:%s\n", fc.GetFuncVersion())
}

在 main() 中,用戶只需要將編寫的 Function name 注冊到 Start() 中,需要注意的是,Start() 中只能接收一個函數(shù)的名字。Go Function 會根據(jù)接收到的 Function name 利用 Go 的反射來驗證用戶實現(xiàn)的參數(shù)列表和返回值列表是否正確。

為了方便驗證,需要規(guī)定用戶輸入的參數(shù)列表與返回值列表具體為什么,Go Function 目前支持如下形式的函數(shù)樣例:

func ()
func () error
func (input) error
func () (output, error)
func (input) (output, error)
func (context.Context) error
func (context.Context, input) error
func (context.Context) (output, error)
func (context.Context, input) (output, error)

在一切準備就緒之后,Go Function 會啟動 channel 來源源不斷地接收從 inputs topic 中傳入的數(shù)據(jù)。需要說明的是,用戶無需關心該 channel 關閉的時機,因為在 Function 的使用場景下,一旦啟動就會源源不斷的來處理輸入的數(shù)據(jù),用戶可以使用 Ctrl+C 來終止整個程序的運行,或者可以通過參數(shù) killAfterIdleMs 來配置該 Function 運行的時長,單位為毫秒。

部署 Go Function

Go Function 的實現(xiàn)依賴于 pulsar-client-go,在運行 Go Function 之前,需要保證 pulsar-client-go 正確安裝,具體安裝參考 pulsar-client-go 安裝。

pulsar-client-go:

https://pulsar.apache.org/docs/en/client-libraries-go/

Go Function 的實現(xiàn)形式雖然同 Java FunctionPython Function 不同,但是為了降低用戶的部署成本,對外暴露給用戶的部署方式是相同的,具體操作如下:

  1. 啟動 Pulsar。

  2. 編譯 Go Function。

$ go build [your function file name].go

3\. 啟動 Go Function。

./bin/pulsar-admin functions localrun/create
--go [your go function path]
--inputs [input topics]
--output [output topic]
--tenant [default:public]
--namespace [default:default]
--name [custom unique go function name]

注意:

  • 與 Java Function 和 Python Function 不同的是,在 Go Function 中,用戶無需指定 classname。

  • 在 Java Function 中,--jar 上傳的是打包好的 jar 文件;在 Python Function 中 --py 所上傳的是 python 文件;無論哪種形式,上傳的都屬于 user code。而 Go Function 中 --go 所上傳的是一份編譯好的可執(zhí)行文件,其中包含了 user code 和 Function 本身的代碼。

Go Function 目前不支持的功能如下:

  • Schema,目前 input 和 output 只允許為 []byte

  • Metrics

  • Secrets

  • Authentication & Authorization

更多關于 Pulsar 的技術干貨和產(chǎn)品動態(tài),請關注 StreamNative 微信公眾號。

下一場 Pulsar meetup 于 6/29 在深圳舉辦,歡迎來現(xiàn)場了解 Pulsar。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容