note:本文是《用 Pulsar 開發(fā)多人在線小游戲》的第三篇,配套源碼和全部文檔參見我的 GitHub 倉(cāng)庫(kù) play-with-pulsar 以及我的文章列表。
我推薦《數(shù)據(jù)密集型應(yīng)用系統(tǒng)設(shè)計(jì)》這本書的第四章:編碼與演化(在線閱讀地址)。
編碼(encoding)和演化(evolution)是兩個(gè)不同的概念,我舉例說(shuō)明一下。
編碼是把內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)化成某種格式的字節(jié)序列,方便傳輸或存儲(chǔ)。
我們最常見的編碼方式就是 JSON 了,JSON 的好處就是字符串處理起來(lái)很簡(jiǎn)單,且易于人類閱讀;但問題是支持的數(shù)據(jù)類型不夠豐富,且傳輸效率不高。為了解決 JSON 的這些問題,就出現(xiàn)了 XML 格式或者 protobuf 這樣的二進(jìn)制編碼協(xié)議。
確定了編碼方式,生產(chǎn)者就可以愉快地把數(shù)據(jù)編碼成對(duì)應(yīng)的格式發(fā)送進(jìn) Pulsar,消費(fèi)者只要按照按照同樣的協(xié)議解碼,就能得到原始數(shù)據(jù)了。
但問題是,我們開發(fā)的程序是在不斷變化的,所以數(shù)據(jù)本身的結(jié)構(gòu)很可能發(fā)生變化,這也就是演化的概念。
比如說(shuō)一開始我把這個(gè) User 類序列化成 JSON 字符串然后發(fā)到 Pulsar 中:
class User {
String name;
int id;
}
但是隨著業(yè)務(wù)發(fā)展,我發(fā)現(xiàn)用 int 類型已經(jīng)無(wú)法表示用戶 ID 了,需要把 id 這個(gè)字段改為字符串類型:
class User {
String name;
String id;
}
這種情況下,生產(chǎn)者依然可以把這個(gè)類序列化成 JSON 然后發(fā)到 Pulsar 中,Pulsar 不關(guān)心數(shù)據(jù)本身的內(nèi)容,默認(rèn)把所有數(shù)據(jù)都視為字節(jié)數(shù)組,所以會(huì)欣然接受生產(chǎn)者發(fā)來(lái)的消息,但消費(fèi)者那邊消費(fèi)數(shù)據(jù)時(shí)肯定會(huì)出問題。
那你說(shuō),我同時(shí)改生產(chǎn)者和消費(fèi)者的代碼還不行嗎?可以,但依然存在很多問題,比如說(shuō):
1、在復(fù)雜的業(yè)務(wù)場(chǎng)景中,數(shù)據(jù)處理的邏輯可以非常復(fù)雜,生產(chǎn)者和消費(fèi)者可能橫跨多個(gè)部門,協(xié)調(diào)成本很高。
2、必須先下線所有消費(fèi)者,代碼升級(jí)完之后才能重新上線。否則消費(fèi)者突然消費(fèi)到無(wú)法識(shí)別的新的數(shù)據(jù)格式,會(huì)產(chǎn)生不可預(yù)期的錯(cuò)誤。
所以,讓系統(tǒng)能夠更加靈活地適應(yīng)變化,就是 schema 能夠給我們提供的價(jià)值。
有了 schema 來(lái)描述數(shù)據(jù)的格式,我們就可以設(shè)置數(shù)據(jù)結(jié)構(gòu)的兼容性(compatibility),比方說(shuō)我們可以設(shè)置數(shù)據(jù)向后兼容,即新代碼可以讀取舊格式的數(shù)據(jù);或者設(shè)置向前兼容,即舊代碼可以讀取新格式的數(shù)據(jù)。
schema 相關(guān)的官網(wǎng)文檔:
https://pulsar.apache.org/docs/next/schema-overview/
我們可以設(shè)置 schema 的兼容性,官網(wǎng)文檔:
https://pulsar.apache.org/docs/next/schema-understand#schema-compatibility-check-strategy
我們?cè)趧?chuàng)建 Pulsar 的生產(chǎn)者/消費(fèi)者時(shí)可以指定消息的 schema:
// 指定生產(chǎn)者的 schema
Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class))
.subscriptionType(SubscriptionType.Shared)
.topic(topicName)
.subscriptionName(subscriptionName)
.subscribe();
// 指定消費(fèi)者的 schema
Producer<User> producer = client.newProducer(Schema.AVRO(User.class))
.topic(topicName)
.create();
每個(gè) topic 的 schema 演化信息都會(huì)存在 Pulsar 當(dāng)中,這樣當(dāng)生產(chǎn)者使用新的 schema 時(shí),Pulsar 會(huì)判斷新的 schema 是否符合當(dāng)前的兼容性設(shè)置,如果符合則更新 topic 對(duì)應(yīng)的 schema,否則的話則會(huì)拒絕生產(chǎn)者發(fā)來(lái)的消息,這樣消費(fèi)者就不會(huì)收到不符合預(yù)期的數(shù)據(jù)了。
在我們的炸彈人游戲中,玩家可以產(chǎn)生很多不同類型的事件,這些事件應(yīng)該以什么格式發(fā)送到 Pulsar 的 topic 中呢?
我的做法是創(chuàng)建了一個(gè) EventMessage 結(jié)構(gòu),用 Type 字段標(biāo)識(shí)事件的類型:
type EventMessage struct {
// Event type
Type string `json:"type"`
Name string `json:"name"`
Avatar string `json:"avatar"`
// Comment stores extra data
Comment string `json:"comment"`
X int `json:"x"`
Y int `json:"y"`
Alive bool `json:"alive"`
List []int `json:"list"`
}
然后用 Avro 的方式定義了一個(gè) JSONSchema:
const eventJsonSchemaDef = `
{
"type": "record",
"name": "EventMessage",
"namespace": "game",
"fields": [
{
"name": "Type",
"type": "string"
},
{
"name": "Name",
"type": "string"
},
{
"name": "Avatar",
"type": "string"
},
{
"name": "Comment",
"type": "string",
"default": ""
},
{
"name": "X",
"type": "int"
},
{
"name": "Y",
"type": "int"
},
{
"name": "Alive",
"type": "boolean"
},
{
"name": "List",
"type": {
"type": "array",
"items" : {
"type":"int"
}
}
}
]
}
`
// player event topicName
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
// use schema to confirm the structure of message
Schema: pulsar.NewJSONSchema(eventJsonSchemaDef, nil),
})
這樣一來(lái),就可以通過(guò) Schema 的約束避免未來(lái)可能產(chǎn)生的很多問題。
更多高質(zhì)量干貨文章,請(qǐng)關(guān)注我的微信公眾號(hào) labuladong 和算法博客 labuladong 的算法秘籍。