Zookeeper 簡介
分布式系統(tǒng)的協(xié)調工作就是通過某種方式,讓每個節(jié)點的信息能夠同步和共享。這依賴于服務進程之間的通信。通信方式有兩種:
通過網(wǎng)絡進行信息共享
這就像現(xiàn)實中,開發(fā)leader在會上把任務傳達下去,組員通過聽leader命令或者看leader的郵件知道自己要干什么。當任務分配有變化時,leader會單獨告訴組員,或者再次召開會議。信息通過人與人之間的直接溝通,完成傳遞。通過共享存儲
這就好比開發(fā)leader按照約定的時間和路徑,把任務分配表放到了svn上,組員每天去svn上拉取最新的任務分配表,然后干活。其中svn就是共享存儲。更好一點的做法是,當svn文件版本更新時,觸發(fā)郵件通知,每個組員再去拉取最新的任務分配表。這樣做更好,因為每次更新,組員都能第一時間得到消
息,從而讓自己手中的任務分配表永遠是最新的。此種方式依賴于中央存儲。
ZooKeeper如何解決分布式系統(tǒng)面臨的問題
ZooKeeper對分布式系統(tǒng)的協(xié)調,使用的是第二種方式,共享存儲。其實共享存儲,分布式應用也需要
和存儲進行網(wǎng)絡通信。

注:Slave節(jié)點要想獲取ZooKeeper的更新通知,需事先在關心的數(shù)據(jù)節(jié)點上設置觀察點。
大多數(shù)分布式系統(tǒng)中出現(xiàn)的問題,都源于信息的共享出了問題。如果各個節(jié)點間信息不能及時共享和同步,那么就會在協(xié)作過程中產(chǎn)生各種問題。ZooKeeper解決協(xié)同問題的關鍵,就是在于保證分布式系統(tǒng)信息的一致性。
zookeeper的基本概念
Zookeeper是一個開源的分布式協(xié)調服務,其設計目標是將那些復雜的且容易出錯的分布式一致性服務封裝起來,構成一個高效可靠的原語集,并以一些簡單的接口提供給用戶使用。zookeeper是一個典型的分布式數(shù)據(jù)一致性的解決方案,分布式應用程序可以基于它實現(xiàn)諸如數(shù)據(jù)訂閱/發(fā)布、負載均衡、命名服務、集群管理、分布式鎖和分布式隊列等功能
基本概念
①集群角色
通常在分布式系統(tǒng)中,構成一個集群的每一臺機器都有自己的角色,最典型的集群就是Master/Slave模式(主備模式),此情況下把所有能夠處理寫操作的機器稱為Master機器,把所有通過異步復制方式獲取最新數(shù)據(jù),并提供讀服務的機器為Slave機器。
而在Zookeeper中,這些概念被顛覆了。它沒有沿用傳遞的Master/Slave概念,而是引入了Leader、Follower、Observer三種角色。Zookeeper集群中的所有機器通過Leader選舉來選定一臺被稱為Leader的機器,Leader服務器為客戶端提供讀和寫服務,除Leader外,其他機器包括Follower和Observer,Follower和Observer都能提供讀服務,唯一的區(qū)別在于Observer不參與Leader選舉過程,不參與寫操作的過半寫成功策略,因此Observer可以在不影響寫性能的情況下提升集群的性能。
②會話(session)
Session指客戶端會話,一個客戶端連接是指客戶端和服務端之間的一個TCP長連接, Zookeeper對外的服務端口默認為2181,客戶端啟動的時候,首先會與服務器建立一個TCP連接,從第一次連接建立開始,客戶端會話的生命周期也開始了,通過這個連接,客戶端能夠心跳檢測與服務器保持有效的會話,也能夠向 Zookeeper服務器發(fā)送請求并接受響應,同時還能夠通過該連接接受來自服務器的 Watch事件通知。
③數(shù)據(jù)節(jié)點(Znode)
在談到分布式的時候,我們通常說的“節(jié)點”是指組成集群的每一臺機器。然而,在 ZooKeeper中,“節(jié)點分為兩類,第一類同樣是指構成集群的機器,我們稱之為機器節(jié)點;第二類則是指數(shù)據(jù)模型中的數(shù)據(jù)單元,我們稱之為數(shù)據(jù)節(jié)點——ZNode。 ZooKeeper將所有數(shù)據(jù)存儲在內(nèi)存中,數(shù)據(jù)模型是一棵樹
(ZNode Tree),由斜杠(/)進行分割的路徑,就是一個node,例如app/path.每個node上都會保存自己的數(shù)據(jù)內(nèi)容,同時還會保存一系列屬性信息。
④版本
剛剛我們提到,Zookeeper的每個Znode上都會存儲數(shù)據(jù),對于每個ZNode,Zookeeper都會為其維護一個叫作 Stat 的數(shù)據(jù)結構,Stat記錄了這個ZNode的三個數(shù)據(jù)版本,分別是version(當前ZNode的版本)、cversion(當前ZNode子節(jié)點的版本)、aversion(當前ZNode的ACL版本)。
⑤Watcher(事件監(jiān)聽器)
Wathcer(事件監(jiān)聽器),是Zookeeper中一個很重要的特性,Zookeeper允許用戶在指定節(jié)點上注冊一些Watcher,并且在一些特定事件觸發(fā)的時候,Zookeeper服務端會將事件通知到感興趣的客戶端,該機制是Zookeeper實現(xiàn)分布式協(xié)調服務的重要特性
⑥ ACL
Zookeeper采用ACL(Access Control Lists)策略來進行權限控制,其定義了如下五種權限:
·CREATE:創(chuàng)建子節(jié)點的權限。
·READ:獲取節(jié)點數(shù)據(jù)和子節(jié)點列表的權限。
·WRITE:更新節(jié)點數(shù)據(jù)的權限。
·DELETE:刪除子節(jié)點的權限。
·ADMIN:設置節(jié)點ACL的權限。
其中需要注意的是,CREATE和DELETE這兩種權限都是針對子節(jié)點的權限控制
環(huán)境搭建
Zookeeper的搭建方式
Zookeeper安裝方式有三種,單機模式和集群模式以及偽集群模式。
- 單機模式:Zookeeper只運行在一臺服務器上,適合測試環(huán)境;
- 集群模式:Zookeeper運行于一個集群上,適合生產(chǎn)環(huán)境,這個計算機集群被稱為一個“集合體”
- 偽集群模式:就是在一臺服務器上運行多個Zookeeper實例;
單機模式搭建:
zookeeper安裝以linux環(huán)境為例:
1、下載
首先我們下載穩(wěn)定版本的zookeeper http://zookeeper.apache.org/releases.html
配置單節(jié)點
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data
cd conf
mv zoo_sample.cfg zoo.cfg
vi conf/zoo.cfg
編輯文件設置 dataDir = /path/to/zookeeper/data
$ bin/zkServer.sh start
啟動CLI
$ bin/zkCli.sh
$ bin/zkCli.sh -server 需要連接的ip:需要連接的port
windows 下啟用 zk
$ zkCli.cmd
$ zkCli.cmd -server 需要連接的ip:需要連接的port
例如 zkCli.cmd 106.75.105.152, 不加端口,默認為 2181
若報錯, 則檢查是否防火墻攔截了.
Opening socket connection to server 192.168.153.12/192.168.153.12:2181.Will not attempt to authentic
停止ZooKeeper服務器
連接服務器并執(zhí)行所有操作后,可以使用以下命令停止zookeeper服務器。
$ bin/zkServer.sh stop
在 Zookeeper中,每一個數(shù)據(jù)節(jié)點都是一個 Znode,上圖根目錄下有兩個節(jié)點,分別是:app1和app2,其中app1下面又有三個子節(jié)點所有 Znode!按層次化進行組織,形成這么一顆樹, Znodel的節(jié)點路徑標識方式和 Unix文件系統(tǒng)路徑非常相似,都是由一系列使用斜杠(/)進行分割的路徑表示,開發(fā)人員可以向這個節(jié)點寫入數(shù)據(jù),也可以在這個節(jié)點下面創(chuàng)建子節(jié)點。
默認端口為 2181
配置偽集群模式
創(chuàng)建 data 文件夾 和 logs 文件夾
clientPort=2181
# 配置快照文件存放的目錄
dataDir=/zkcluster/zookeeper01/data
# 配置日志文件存放的目錄
dataLogDir=/zkcluster/zookeeper01/data/logs
clientPort=2182
dataDir=/zkcluster/zookeeper02/data
dataLogDir=/zkcluster/zookeeper02/data/logs
clientPort=2183
dataDir=/zkcluster/zookeeper03/data
dataLogDir=/zkcluster/zookeeper03/data/logs
data 下創(chuàng)建 myid 文件, 內(nèi)容分別為 1, 2, 3 (數(shù)字可以依次累增), 這個文件的作用就是記錄zk的id
server.服務器ID=服務器IP地址:服務器之間通信端口:服務器之間投票選舉端口
server.1=10.211.55.4:2881:3881
server.2=10.211.55.4:2882:3882
server.3=10.211.55.4:2883:3883
touch myid
分別向三臺服務器寫入數(shù)字 1 2 和 3.
啟動集群
分別啟動這三臺服務器
$ bin/zkServer.sh start
查看狀態(tài)
./zkServer.sh status
Zookeeper基本使用
ZooKeeper系統(tǒng)模型
ZooKeeper數(shù)據(jù)模型Znode在ZooKeeper中,數(shù)據(jù)信息被保存在一個個數(shù)據(jù)節(jié)點上,這些節(jié)點被稱為 ZNode。ZNode是Zookeeper中最小數(shù)據(jù)單位,在ZNode下面又可以再掛 ZNode,這樣一層層下去就形成了一個層次化命名空間ZNode樹,我們稱為ZNode Tree,它采用了類似文件系統(tǒng)的層級樹狀結構進行管理。見下圖示例:

四種節(jié)點類型
- 持久節(jié)點
- 持久順序節(jié)點
- 臨時節(jié)點
- 臨時順序節(jié)點
事務ID
ZNode 狀態(tài)信息
ACL
我們可以從三個方面來理解ACL機制:權限模式( Scheme)、授權對象(ID)、權限( Permission),通常使用" scheme:id: permission"來標識一個有效的ACL信息。
權限模式: Scheme
權限模式用來確定權限驗證過程中使用的檢驗策略
授權對象:ID
授權對象指的是權限賦予的用戶或一個指定實體,例如IP地址或是機器等。在不同的權限模式下,授權對象是不同的,表中列出了各個權限模式和授權對象之間的對應關系
權限
權限就是指那些通過權限檢査后可以被允許執(zhí)行的操作。在 Zookeeper中,所有對數(shù)據(jù)的操作權限分為以下五大類
- CREATE(C):數(shù)據(jù)節(jié)點的創(chuàng)建權限,允許授權對象在該數(shù)據(jù)節(jié)點下創(chuàng)建子節(jié)點。
- DELETE(D子節(jié)點的刪除權限,允許授權對象刪除該數(shù)據(jù)節(jié)點的子節(jié)點。?
- READ(R):數(shù)據(jù)節(jié)點的讀取權限,允許授權對象訪問該數(shù)據(jù)節(jié)點并讀取其數(shù)據(jù)內(nèi)容或子節(jié)點列表等。
- WRTE(W):數(shù)據(jù)節(jié)點的更新權限,允許授權對象對該數(shù)據(jù)節(jié)點進行更新操作。
- ADMIN(A):數(shù)據(jù)節(jié)點的管理權限,允許授權對象對該數(shù)據(jù)節(jié)點進行ACL相關的設置操作。
創(chuàng)建節(jié)點
使用 creates命令,可以創(chuàng)建一個 Zookeeper節(jié)點,如
create [-s][-e] path data acl
其中,-s 或 -e 分別指定節(jié)點特性,順序或臨時節(jié)點,若不指定,則創(chuàng)建持久節(jié)點;ac1用來進行權限控制。
- 創(chuàng)建永久(持久)節(jié)點
使用create /zk-permanent 123命令創(chuàng)建zk- permanent永久節(jié)點
[zk: Loca thost: 2181(CONNECTED) 1] create/zk-permanent 123
Created/Zk-permanent
[zk: localhost: 2181(CONNECTED) 2] Ls/
[zk-permanent, zookeeper, zk-test00000000041
- 創(chuàng)建持久順序節(jié)點
使用create -s /zk-test 123命令創(chuàng)建zk-test順序節(jié)點
lzk: Localhost: 2181(CONNECTED)0] create-s/zk-test 123
Created/Zk-testo000000004
執(zhí)行完后,就在根節(jié)點下創(chuàng)建了一個叫做 / zk-test 的節(jié)點,該節(jié)點內(nèi)容就是123,同時可以看到創(chuàng)建的
zk-test 節(jié)點后面添加了一串數(shù)字以示區(qū)別
創(chuàng)建臨時節(jié)點
使用create -e /zk-temp 123命令創(chuàng)建zk-temp臨時節(jié)點創(chuàng)建臨時順序節(jié)點
create -e -s /zk-temp 123
可以看到永久節(jié)點不同于順序節(jié)點, 不會自動在后面添加一串數(shù)字
quit 退出客戶端
讀取節(jié)點
與讀取相關的命令有 ls 命令和 get 命令
ls 命令可以列出 Zookeeper指定節(jié)點下的所有子節(jié)點,但只能查看指定節(jié)點下的第一級的所有子節(jié)點;
ls path
其中,path表示的是指定數(shù)據(jù)節(jié)點的節(jié)點路徑
get命令可以獲取 Zookeeper:指定節(jié)點的數(shù)據(jù)內(nèi)容和屬性信息。
get path
若獲取根節(jié)點下面的所有子節(jié)點,使用 ls 命令即可
若想獲取/zk-permanente的數(shù)據(jù)內(nèi)容和屬性,可使用如下命令:get /zk-permanent
更新節(jié)點
使用set命令,可以更新指定節(jié)點的數(shù)據(jù)內(nèi)容,用法如下
set path data [version]
其中,data就是要更新的新內(nèi)容, version表示數(shù)據(jù)版本,在 zookeeper中,節(jié)點的數(shù)據(jù)是有版本概
念的,這個參數(shù)用于指定本次更新操作是基于 Inode的哪一個數(shù)據(jù)版本進行的,如將/zk- permanent節(jié)
點的數(shù)據(jù)更新為455,可以使用如下命令:set /zk-permanent 456
zk: Loca Lhost: 2181( CONNECTED)3] set /zk-permanent 456
Iczxid 0X12
Ctime Sat Mar 07 18: 11: 14 CST 2020
Imzxid =0x13
Mtime =Sat Mar 07 18: 13: 48 CST 2020
Zxid = 0x12
cversion =O
ldataversion 1
laclversion =0
ephemera lowner 0x0
ldatalength =3
numchildren =0
刪除節(jié)點
使用 delete 命令可以刪除 Zookeeper上的指定節(jié)點,用法如下
delete path [version]
其中 version也是表示數(shù)據(jù)版本,使用 delete /zk-permanent 命令即可刪除 zk-permanent節(jié)點
zk 的 Java 客戶端工具
zk 的 Java 客戶端工具 curator
創(chuàng)建節(jié)點
獲取數(shù)據(jù)
// 普通查詢
client.getData().forPath(path);
// 包含狀態(tài)查詢
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
更新數(shù)據(jù)
// 普通更新
client.setData().forPath(path,"新內(nèi)容".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);
刪除數(shù)據(jù)
配置存儲
命名服務
如果加了排它鎖則只對一個事務可見, 若加上共享鎖,則對所有事務可見.
作業(yè)
編程題一:
在基于 Netty 的自定義RPC的案例基礎上,進行改造。基于 Zookeeper 實現(xiàn)簡易版服務的注冊與發(fā)現(xiàn)機制
要求完成改造版本:
- 啟動 2 個服務端,可以將IP及端口信息自動注冊到 Zookeeper
- 客戶端啟動時,從Zookeeper中獲取所有服務提供端節(jié)點信息,客戶端與每一個服務端都建立連接
- 某個服務端下線后,Zookeeper注冊列表會自動剔除下線的服務端節(jié)點,客戶端與下線的服務端斷開連接
- 服務端重新上線,客戶端能感知到,并且與重新上線的服務端重新建立連接
編程題二:
基于作業(yè)一的基礎上,實現(xiàn)基于 Zookeeper 的簡易版負載均衡策略
要求完成改造版本:
- Zookeeper 記錄每個服務端的最后一次響應時間,有效時間為 5秒,5s內(nèi)如果該服務端沒有新的請求,響應時間清零或失效
- 當客戶端發(fā)起調用,每次都選擇最后一次響應時間短的服務端進行服務調用,如果時間一致,隨機選取一個服務端進行調用,從而實現(xiàn)負載均衡
編程題三:
基于Zookeeper實現(xiàn)簡易版配置中心
要求實現(xiàn)以下功能:
- 創(chuàng)建一個 Web 項目,將數(shù)據(jù)庫連接信息交給Zookeeper配置中心管理,即:當項目Web項目啟動時,從 Zookeeper 進行 MySQL 配置參數(shù)的拉取
- 要求項目通過數(shù)據(jù)庫連接池訪問MySQL(連接池可以自由選擇熟悉的)
- 當 Zookeeper 配置信息變化后Web項目自動感知,正確釋放之前連接池,創(chuàng)建新的連接池
作業(yè)資料說明:
1、提供資料:3個代碼工程、驗證及講解視頻。(倉庫中只有本次作業(yè)內(nèi)容)
2、講解內(nèi)容包含:題目分析、實現(xiàn)思路、代碼講解。
3、效果視頻驗證:
3.1 作業(yè)1:服務端的上線與下線,客戶端能動態(tài)感知,并能重新構成負載均衡。
3.2 作業(yè)2:作業(yè)完成情況下,選擇性能好的服務器處理(響應時間短的服務器即為性能好)。Zookeeper記錄客戶端響應有效時間為5s,超時判定該客戶端失效。
3.3 作業(yè)3:Zookeeper配置中心,web訪問數(shù)據(jù)庫需要從Zookeeper獲取連接資源。當Zookeeper配置發(fā)生改變,web自動切換到新的連接資源,保持正常訪問。
作業(yè)1
新增 NodeChangeListener 類
public interface NodeChangeListener {
/**
*
* @param serviceName 服務名稱
* @param serviceList 服務名稱對應節(jié)點下的所有子節(jié)點, 目前沒有用到
* @param pathChildrenCacheEvent
*/
void notify(String serviceName, List<String> serviceList, PathChildrenCacheEvent pathChildrenCacheEvent);
}
將 zk 的行為抽象成接口
public interface RpcRegistryHandler extends NodeChangeListener {
/**
* 服務端進行調用
*
* @param service
* @param ip
* @param port
* @return
*/
boolean registry(final String service, final String ip, final int port);
/**
* 客戶端進行調用
*
* @param service
* @return
*/
List<String> discovery(final String service);
void addListener(NodeChangeListener service);
void destroy();
}
ConfigKeeper 配置類
public class ConfigKeeper {
/**
* netty 的端口號
*/
private int nettyPort;
/**
* zk 地址: ip + 端口
*/
private String zkAddr;
/**
* 主動上報時間,單位 秒
*/
private int interval;
/**
* 區(qū)分是客戶端 還是 server 端, true 是服務端, false 是客戶端
*/
private boolean providerSide;
// 單例類,setter 和 getter 方法
}
新增 RpcResponse 類
package com.lagou;
public class RpcResponse {
/**
* 響應ID
*/
private String requestId;
/**
* 錯誤信息
*/
private String error;
/**
* 返回的結果
*/
private Object result;
// setter 和 getter 方法, toString方法
}
RpcServerHandler 類,這次主要對 channelRead 的方法內(nèi)容作了調整
/**
* 服務端將數(shù)據(jù) 寫入 客戶端, 繼續(xù)傳遞下去
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 222222
RpcRequest request = (RpcRequest) msg;
final RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(request.getRequestId());
System.out.println("111 接收到" + request.getRequestId());
rpcResponse.setResult(handler(request));
// 3333333
ctx.writeAndFlush(rpcResponse);
}
服務端 rpc -server 用到的配置類 RpcServerConfig
public class RpcServerConfig {
private String nettyHost;
private int nettyPort;
private int delay;
/**
* 是否是服務端
*/
private boolean providerSide;
/**
* 應用的名稱
*/
private String applicationName;
private Map<String, Class> services;
// setter 和 getter 方法
}
RpcServer 類
自身 implements InitializingBean, DisposableBean 接口
Autowired 了 RpcRegistryFactory 對象
主要關注 afterPropertiesSet 和 destroy 方法即可
@Override
public void afterPropertiesSet() throws Exception {
this.initRpcServerConfig();
this.startServer();
}
@Override
public void destroy() {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
這里牽涉到了RpcRegistryFactory
/**
* 注冊中心工廠類
*/
@Component
public class RpcRegistryFactory implements FactoryBean<RpcRegistryHandler>, DisposableBean {
private RpcRegistryHandler rpcRegistryHandler;
@Override
public RpcRegistryHandler getObject() {
if (this.rpcRegistryHandler == null) {
rpcRegistryHandler = new ZkRegistryHandler(ConfigKeeper.getInstance().getZkAddr());
}
return rpcRegistryHandler;
}
@Override
public Class<?> getObjectType() {
System.out.println("RpcRegistryFactory ### getObjectType.....");
return RpcRegistryHandler.class;
}
@Override
public void destroy() {
System.out.println("RpcRegistryFactory ### destroy.....");
rpcRegistryHandler.destroy();
}
}
用于設計參數(shù)的 ProviderLoader
public class ProviderLoader {
private ProviderLoader() {
}
/**
* 返回類的全路徑名 -> 該類的 class
* @return
*/
public static Map<String, Class> getInstanceCacheMap() {
Map<String, Class> services = new HashMap<>();
services.put(IUserService.class.getName(), IUserService.class);
return services;
}
}
ZkRegistryHandler 是對 RpcRegistryHandler 接口的 zk 實現(xiàn)。
public class ZkRegistryHandler implements RpcRegistryHandler {
private static final String ZK_PATH_SPLITER = "/";
private static final String LAGOU_EDU_RPC_ZK_ROOT = ZK_PATH_SPLITER + "lg-rpc-provider" + ZK_PATH_SPLITER;
private List<NodeChangeListener> listenerList = new ArrayList<>();
private final String url;
private final CuratorFramework client;
private volatile boolean closed;
/**
* 子節(jié)點列表
*/
private List<String> serviceList;
private static final ScheduledExecutorService REPORT_WORKER = Executors.newScheduledThreadPool(5);
public ZkRegistryHandler(final String zkPath) {
url = zkPath;
this.client = CuratorFrameworkFactory.builder()
.connectString(zkPath)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
if (ConnectionState.CONNECTED.equals(connectionState)) {
System.out.println("注冊中心連接成功");
}
}
});
client.start();
// 定時上報
final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
final boolean providerSide = configKeeper.isProviderSide();
final int interval = configKeeper.getInterval();
if (!providerSide && interval > 0) {
REPORT_WORKER.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("我是一個定時任務");
// RequestMetri
}
}, interval, interval, TimeUnit.SECONDS);
}
}
/**
* 服務端注冊用到的方法
* @param serviceName
* @param nettyHost
* @param nettyPort
* @return
*/
@Override
public boolean registry(final String serviceName, final String nettyHost, final int nettyPort) {
String zkPath = providePath(serviceName);
if (!exists(zkPath)) {
create(zkPath, false);
}
// /lg-rpc-provider/com.lagou.server.IUserService/provider/localhost:8999
String instancePath = zkPath + ZK_PATH_SPLITER + nettyHost + ":" + nettyPort;
create(instancePath, true);
return true;
}
/**
* 客戶端查找服務的方法
*
* @param serviceName
* @return
*/
@Override
public List<String> discovery(final String serviceName) {
final String path = providePath(serviceName);
if (serviceList == null || serviceList.isEmpty()) {
System.out.println("首次查找地址");
try {
serviceList = client.getChildren().forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
}
this.registryWatch(serviceName, path);
return serviceList;
}
@Override
public void addListener(NodeChangeListener listener) {
listenerList.add(listener);
}
@Override
public void destroy() {
client.close();
}
@Override
public void notify(String children, List<String> serviceList, PathChildrenCacheEvent pathChildrenCacheEvent) {
for (NodeChangeListener nodeChangeListener : listenerList) {
nodeChangeListener.notify(children, serviceList, pathChildrenCacheEvent);
}
}
private void create(final String path, final boolean ephemeral) {
CreateMode createMode;
if (ephemeral) {
createMode = CreateMode.EPHEMERAL;
} else {
createMode = CreateMode.PERSISTENT;
}
try {
client.create().creatingParentsIfNeeded().withMode(createMode).forPath(path);
} catch (KeeperException.NodeExistsException e) {
// do nothing
System.out.println("該路徑已存在" + path);
}
catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
private boolean exists(final String path) {
try {
if (client.checkExists().forPath(path) != null) {
return true;
}
} catch (KeeperException.NoNodeException e) {
// do nothing
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}
/**
* 設置監(jiān)聽的方法
*
* @param serviceName
* @param path
*/
private void registryWatch(final String serviceName, final String path) {
PathChildrenCache nodeCache = new PathChildrenCache(client, path, true);
try {
nodeCache.getListenable().addListener((client, pathChildrenCacheEvent) -> {
// 更新本地緩存
serviceList = client.getChildren().forPath(path);
listenerList.forEach(nodeChangeListener -> {
System.out.println("節(jié)點變化");
nodeChangeListener.notify(serviceName, serviceList, pathChildrenCacheEvent);
});
});
nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
}
}
/**
* 返回 /lg-rpc-provider/com.lagou.server.IUserService/provider
* @param serviceName
* @return
*/
private String providePath(String serviceName) {
return LAGOU_EDU_RPC_ZK_ROOT + serviceName + ZK_PATH_SPLITER + "provider";
}
private String metricsPath() {
return LAGOU_EDU_RPC_ZK_ROOT + "metrics";
}
}
rpc-server 的啟動類
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
// ["localhost:2181", "8999"]
// ["localhost:2181", "9000"]
final String zkPath = args[0];
final int nettyPort = Integer.parseInt(args[1]);
// 將IP及端口信息自動注冊到 Zookeeper
ConfigKeeper configKeeper = ConfigKeeper.getInstance();
configKeeper.setProviderSide(true);
configKeeper.setInterval(5);
configKeeper.setNettyPort(nettyPort);
configKeeper.setZkAddr(zkPath);
System.out.println(configKeeper);
SpringApplication.run(MyApplication.class, args);
// 可以通過 ls /lg-rpc-provider/com.lagou.server.IUserService/provider 查看節(jié)點信息
}
}
分別為 netty 啟動 8888 和 8900 端口

接下來講解 rpc-client
UserClientHandler 類可以復用
新增 RpcClient
主要對外暴露了 initClient 和 send 方法
// 2. 初始化netty客戶端(創(chuàng)建連接池 bootstrap, 設置 BootStrap 連接服務器)
public void initClient(String serviceClassName) throws InterruptedException {
// 創(chuàng)建連接池
this.group = new NioEventLoopGroup();
// 創(chuàng)建客戶端啟動類
Bootstrap bootstrap = new Bootstrap();
// 配置啟動引導類
bootstrap.group(group)
// 通道類型為 NIO
.channel(NioSocketChannel.class)
// 設置請求協(xié)議為 tcp
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
// 監(jiān)聽 channel 并初始化
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
// 獲取管道對象
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
pipeline.addLast(new RpcDecoder(RpcResponse.class, new JSONSerializer()));
// 自定義事件處理器
pipeline.addLast(new UserClientHandler());
}
});
this.channel = bootstrap.connect(this.nettyIp, this.nettyPort).sync().channel();
if (!isValidate()) {
close();
return;
}
System.out.println("啟動客戶端" + serviceClassName + ", ip = " + this.nettyIp + ", port = " + nettyPort);
}
public Object send(RpcRequest request) throws InterruptedException, ExecutionException {
// 統(tǒng)計請求時間
RequestMetrics.getInstance().put(nettyIp, this.nettyPort, request.getRequestId());
return this.channel.writeAndFlush(request).sync().get();
}
新增 RpcConsumer 類
主要有用的方法有構造方法,createProxy的方法,notify為類自身 實現(xiàn) NodeChangeListener 接口的方法(因為構造時會this.rpcRegistryHandler.addListener(this);)。
public class RpcConsumer implements NodeChangeListener {
private final RpcRegistryHandler rpcRegistryHandler;
private final Map<String, Class> serviceMap;
/**
* 服務名 -> List<RpcClient>
*/
private final Map<String, List<RpcClient>> CLIENT_POOL = new HashMap<>();
private LoadBalanceStrategy balanceStrategy = new RandomLoadBalance();
/**
* 初始化
* @param rpcRegistryHandler
* @param instanceCacheMap
*/
public RpcConsumer(final RpcRegistryHandler rpcRegistryHandler, final Map<String, Class> instanceCacheMap) {
this.rpcRegistryHandler = rpcRegistryHandler;
this.serviceMap = instanceCacheMap;
// 開始自動注冊消費者邏輯: accept 方法
serviceMap.forEach((className, clazz) -> {
List<RpcClient> rpcClients = CLIENT_POOL.get(className);
if (rpcClients == null) {
rpcClients = new ArrayList<>();
}
// 127.0.0.1:8999 127.0.0.1:9000
final List<String> discovery = this.rpcRegistryHandler.discovery(className);
for (String s : discovery) {
// s -> rpcClient
final String[] split = s.split(":");
String nettyIp = split[0];
int nettyPort = Integer.parseInt(split[1]);
final RpcClient rpcClient = new RpcClient(nettyIp, nettyPort);
try {
rpcClient.initClient(className);
} catch (InterruptedException e) {
e.printStackTrace();
}
rpcClients.add(rpcClient);
CLIENT_POOL.put(className, rpcClients);
}
});
this.rpcRegistryHandler.addListener(this);
}
// 4. 編寫一個方法,使用 jdk 動態(tài)代理對象
@SuppressWarnings("unchecked")
public <T> T createProxyEnhance(final Class<T> serverClass) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serverClass}, (proxy, method, args) -> {
final String serverClassName = serverClass.getName();
// 封裝
final RpcRequest request = new RpcRequest();
final String requestId = UUID.randomUUID().toString().substring(0, 7);
String className = method.getDeclaringClass().getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
request.setRequestId(requestId);
request.setClassName(className);
request.setMethodName(methodName);
request.setParameterTypes(parameterTypes);
request.setParameters(args);
System.out.println("******************************\n請求id = " + requestId + ", 請求方法名 = " + methodName + ", 請求參數(shù) = " + Arrays.toString(args));
RpcClient rpcClient = balanceStrategy.route(CLIENT_POOL, serverClassName);
if (rpcClient == null) {
System.out.println("沒找到對應服務端,返 NULL");
return null;
}
System.out.println(request);
// request 最終會客戶端發(fā)送給服務端進行消費
return rpcClient.send(request);
});
}
/**
* 監(jiān)聽臨時節(jié)點的變化
*
* @param service 服務名稱
* @param serviceList 服務名稱對應節(jié)點下的所有子節(jié)點
* @param pathChildrenCacheEvent
*/
@Override
public void notify(final String service, final List<String> serviceList,
final PathChildrenCacheEvent pathChildrenCacheEvent) {
// 取出變化的節(jié)點名稱, 例如為 /lg-rpc-provider/com.lagou.server.IUserService/provider/localhost:9000
final String path = pathChildrenCacheEvent.getData().getPath();
System.out.println("變化節(jié)點的路徑: " + path + ", 變化的類型: " + pathChildrenCacheEvent.getType());
// 分離出 ip:port 的組合。
final String instanceConfig = path.substring(path.lastIndexOf("/") + 1);
System.out.println("instanceConfig: " + instanceConfig);
final String[] address = instanceConfig.split(":");
System.out.println("address: " + address);
final String nettyIp = address[0];
final int nettyPort = Integer.parseInt(address[1]);
List<RpcClient> rpcClients = CLIENT_POOL.get(service);
switch (pathChildrenCacheEvent.getType()) {
// 增加節(jié)點
case CHILD_ADDED:
case CONNECTION_RECONNECTED:
if (rpcClients == null) {
rpcClients = new ArrayList<>();
}
final RpcClient rpcClient = new RpcClient(nettyIp, nettyPort);
try {
rpcClient.initClient(service);
} catch (InterruptedException e) {
e.printStackTrace();
}
rpcClients.add(rpcClient);
// 節(jié)點耗時統(tǒng)計
RequestMetrics.getInstance().addNode(nettyIp, nettyPort);
System.out.println("新增節(jié)點" + instanceConfig);
break;
// 增加節(jié)點
case CHILD_REMOVED:
case CONNECTION_SUSPENDED:
case CONNECTION_LOST:
if (rpcClients != null) {
for (RpcClient client : rpcClients) {
if (client.getNettyIp().equals(nettyIp) && client.getNettyPort() == nettyPort) {
rpcClients.remove(client);
// 節(jié)點耗時統(tǒng)計
RequestMetrics.getInstance().remoteNode(nettyIp, nettyPort);
System.out.println("移除節(jié)點" + instanceConfig);
break;
}
}
}
break;
}
}
}
最后再講講 ConsumerBootStrap, 該類基本沒啥改動
public class ConsumerBootStrap {
public static void main(final String[] args) throws Exception {
final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
configKeeper.setZkAddr(args[0]);
// 之后會啟動一個定時的線程池,每 5s 上傳到注冊中心
configKeeper.setInterval(5);
configKeeper.setProviderSide(false);
final RpcRegistryHandler rpcRegistryHandler = new ZkRegistryHandler(configKeeper.getZkAddr());
System.out.println("客戶端 Zookeeper session established.");
// 最后一步
final RpcConsumer consumer = new RpcConsumer(rpcRegistryHandler, ProviderLoader.getInstanceCacheMap());
final IUserService userService = consumer.createProxyEnhance(IUserService.class);
while (true) {
Thread.sleep(4900);
final String result = userService.sayHello("are you ok?");
// 恒為 null
System.out.println("返回 = " + result);
}
}
}
作業(yè)2

1.消費者每次請求完成時更新最后一次請求耗時和系統(tǒng)時間
這部分工作主要在客戶端做。
首先介紹一下這次用到的兩個類
package com.lagou.boot;
public class Metrics {
private String nettyIp;
private int nettyPort;
private long start;
private Long cost;
public Metrics(String nettyIp, int nettyPort, long start, Long cost) {
this.nettyIp = nettyIp;
this.nettyPort = nettyPort;
this.start = start;
this.cost = cost;
}
public Metrics(String nettyIp, int nettyPort, long start) {
this(nettyIp, nettyPort, start, null);
}
public String getNettyIp() {
return nettyIp;
}
public void setNettyIp(String nettyIp) {
this.nettyIp = nettyIp;
}
public int getNettyPort() {
return nettyPort;
}
public void setNettyPort(int nettyPort) {
this.nettyPort = nettyPort;
}
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public Long getCost() {
return cost;
}
public void setCost(Long cost) {
this.cost = cost;
}
}
RequestMetrics 類
COST_TIME_MAP變量 ip:端口 -》 耗時
REQUEST_ID_MAP變量 requestId -> ip + 端口 + 起始時間戳 + 耗時
calculate 方法用于 根據(jù)requestId 進行耗時統(tǒng)計
統(tǒng)計請求時間 在RpcClient的 send 方法中進行
public class RequestMetrics {
/**
* ip:端口 -》 耗時
*/
private static final ConcurrentHashMap<String, Long> COST_TIME_MAP = new ConcurrentHashMap<>();
/**
* requestId -> ip + 端口 + 起始時間戳 + 耗時
* 每個 requestId 用完一次后就會被銷毀
*/
private static final ConcurrentHashMap<String, Metrics> REQUEST_ID_MAP = new ConcurrentHashMap<>();
private static final RequestMetrics requestMetrics = new RequestMetrics();
public ConcurrentHashMap<String, Long> getMetricMap() {
return COST_TIME_MAP;
}
private RequestMetrics() {
}
public static RequestMetrics getInstance() {
return requestMetrics;
}
public void addNode(String nettyIp, int nettyPort) {
COST_TIME_MAP.put(nettyIp + ":" + nettyPort, 0L);
}
public void remoteNode(String nettyIp, int nettyPort) {
COST_TIME_MAP.remove(nettyIp + ":" + nettyPort);
}
/**
* 響應時放入, 根據(jù)requestId 進行耗時統(tǒng)計
* @param requestId
*/
public void calculate(String requestId) {
final Metrics metrics = REQUEST_ID_MAP.get(requestId);
Long cost = System.currentTimeMillis() - metrics.getStart();
COST_TIME_MAP.put(metrics.getNettyIp() + ":" + metrics.getNettyPort(), cost);
REQUEST_ID_MAP.remove(requestId);
}
/**
* 獲取所有節(jié)點耗時統(tǒng)計
*/
public List<Metrics> getAllInstances() {
List<Metrics> result = new ArrayList<>();
COST_TIME_MAP.forEach((url, aLong) -> {
String[] split = url.split(":");
result.add(new Metrics(split[0], Integer.parseInt(split[1]), aLong));
});
return result;
}
/**
* 請求時放入
* @param nettyIp
* @param nettyPort
* @param requestId
*/
public void put(String nettyIp, int nettyPort, String requestId) {
REQUEST_ID_MAP.put(requestId, new Metrics(nettyIp, nettyPort, System.currentTimeMillis(), null));
}
}
2.消費者定時在啟動時創(chuàng)建定時線程池,每隔5s自動上報,更新Zookeeper臨時節(jié)點的值
ConsumerBootStrap 入口有一個參數(shù)配置
// 之后會啟動一個定時的線程池,每 5s 上傳到注冊中心
configKeeper.setInterval(5);
ZkRegistryHandler 會開啟一個 ScheduledExecutorService 線程池服務
RequestMetrics 的
// 定時上報
final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
final boolean providerSide = configKeeper.isProviderSide();
final int interval = configKeeper.getInterval();
if (!providerSide && interval > 0) {
REPORT_WORKER.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("我是一個定時任務");
// ...
}
}, interval, interval, TimeUnit.SECONDS);
}
- 每次上報時判斷當前時間距離最后一次請求是否超過5s,超過5s則需要刪除Zookeeper上面的內(nèi)容
這里介紹下 RequestMetrics 的 getAllInstances() 方法, 如果 5 秒內(nèi)沒有響應清空請求時間。
/**
* 獲取所有節(jié)點耗時統(tǒng)計
*/
public List<Metrics> getAllInstances() {
List<Metrics> result = new ArrayList<>();
COST_TIME_MAP.forEach((url, aLong) -> {
String[] split = url.split(":");
result.add(new Metrics(split[0], Integer.parseInt(split[1]), aLong));
});
return result;
}
接下來簡單說一下負載均衡策略,這里主要涉及到了使用那個客戶端進行服務的請求。

public abstract class AbstractLoadBalanceStrategy implements LoadBalanceStrategy{
@Override
public RpcClient route(Map<String, List<RpcClient>> clientPool, String serverClassName) {
List<RpcClient> rpcClients = clientPool.get(serverClassName);
if (null == rpcClients) return null;
return doSelect(rpcClients);
}
protected abstract RpcClient doSelect(List<RpcClient> rpcClients);
}
MinCostLoadBalance (該類未經(jīng)驗證)
public class MinCostLoadBalance extends AbstractLoadBalanceStrategy {
@Override
protected RpcClient doSelect(final List<RpcClient> rpcClients) {
ConcurrentHashMap<String, Long> metricMap = RequestMetrics.getInstance().getMetricMap();
RpcClient minCostRpcClient = rpcClients.get(0);
final Long minLong = metricMap.get(minCostRpcClient.getNettyIp() + minCostRpcClient.getNettyPort());
for (int i = 1; i < rpcClients.size(); i++) {
RpcClient rpcClient = rpcClients.get(i);
String nettyIp = rpcClient.getNettyIp();
int nettyPort = rpcClient.getNettyPort();
// 取出最小響應時間的客戶端,并進行調用
Long aLong = metricMap.get(nettyIp + nettyPort);
if (aLong != null && aLong < minLong) {
minCostRpcClient = rpcClient;
}
}
return minCostRpcClient;
}
}
RandomLoadBalance (該類未經(jīng)驗證)
public class RandomLoadBalance extends AbstractLoadBalanceStrategy {
private final Random random = new Random();
@Override
protected RpcClient doSelect(List<RpcClient> rpcClients) {
int size = rpcClients.size();
int index = random.nextInt(size);
return rpcClients.get(index);
}
}

作業(yè)3
以下項目主要使用了 commons-dbcp + fastjson + apache.curator 技術進行實現(xiàn)。
這里會通過create [-s][-e] path data acl命令創(chuàng)建節(jié)點:
建立所需節(jié)點
我會將數(shù)據(jù)庫配置的用戶名和密碼等信息寫入/dbConfig/lagou.config.DbConfig 節(jié)點中。
先建立父節(jié)點
create /dbConfig ""
然后若不存在臨時節(jié)點則重新創(chuàng)建
# 向 /dbConfig/lagou.config.DbConfig 中寫入配置信息
create -e /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/aaaa?serverTimezone=UTC"}
# 查看是否能正常獲取節(jié)點信息
get /dbConfig/lagou.config.DbConfig
更改數(shù)據(jù)
# 更改數(shù)據(jù)庫為 aaaa
set /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/aaaa?serverTimezone=UTC"}
# 更改數(shù)據(jù)庫為 bbbb
set /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/bbbb?serverTimezone=UTC"}
創(chuàng)建Java類
- 創(chuàng)建工具類 RuntimeContext
@Component
public class RuntimeContext implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (RuntimeContext.applicationContext == null) {
RuntimeContext.applicationContext = applicationContext;
}
}
//獲取applicationContext
private static ApplicationContext getApplicationContext() {
return applicationContext;
}
//通過name獲取Bean
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
// ...
}
- 實體類
package lagou.config;
public class DbConfig {
private String url;
private String username;
private String password;
// setter getter 方法
}
- 創(chuàng)建 MyDataSource 自定義數(shù)據(jù)源,我們重寫的dbcp中的類BasicDataSource,我們將其全部拷貝了出來,然后重命名為MyDataSource類,然后在其中修改了以下內(nèi)容
3.1將 UNKNOWN_TRANSACTIONISOLATION 的值改為 -1. 否則這個內(nèi)部變量會找不到
/**
* The default TransactionIsolation state of connections created by this pool.
*/
protected volatile int defaultTransactionIsolation = PoolableConnectionFactory.UNKNOWN_TRANSACTIONISOLATION;
3.2jdk 1.7 之后需要實現(xiàn)該方法 getParentLogger()
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
3.3 修改已有的 createDataSource() 方法,刪除這幾行代碼

3.4 新增 changeDataSource 方法
public static void changeDataSource() {
MyDataSource dataSource = (MyDataSource) RuntimeContext.getBean("dataSource");
try {
dataSource.close();
dataSource.createDataSource();
} catch (SQLException e) {
e.printStackTrace();
}
}
- 新建 InitListener 類,該類實現(xiàn)了ServletContextListener 來對Zookeeper節(jié)點 /db/url 的監(jiān)聽
public class InitListener implements ServletContextListener {
private static final String CONNENT_ADDR = "localhost:2181";
private static final String PATH = "/dbConfig";
private static final String SUB_PATH = PATH + "/" + DbConfig.class.getName();
@Override
public void contextInitialized(ServletContextEvent sce) {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNENT_ADDR)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
PathChildrenCache nodeCache = new PathChildrenCache(curatorFramework, "/dbConfig", true);
try {
nodeCache.getListenable().addListener((client, pathChildrenCacheEvent) -> {
System.out.println(pathChildrenCacheEvent.getType());
if (PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(pathChildrenCacheEvent.getType())) {
final ChildData data = pathChildrenCacheEvent.getData();
if (data != null) {
final String path = data.getPath();
System.out.println(path);
System.out.println(SUB_PATH);
if (path.equals(SUB_PATH)) {
MyDataSource datasource = (MyDataSource) RuntimeContext.getBean("dataSource");
final DbConfig dbConfig = JSON.parseObject(new String(data.getData()), DbConfig.class);
System.out.println(dbConfig.toString());
datasource.setUrl(dbConfig.getUrl());
datasource.setUsername(dbConfig.getUsername());
datasource.setPassword(dbConfig.getPassword());
MyDataSource.changeDataSource();
}
}
}
});
nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
- 修改spring boot 啟動類,注冊 InitListener ,配置 我們自定義的 DataSource,建立一個query方法(可通過/query 進行訪問)暴露出去。
@SpringBootApplication
@RestController
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@RequestMapping("/query")
public String query() {
String sql = "select id from `info` limit 1";
return jdbcTemplate.queryForObject(sql, String.class);
}
@Bean
public ServletListenerRegistrationBean servletListenerRegistrationBean() {
ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
servletListenerRegistrationBean.setListener(new InitListener());
return servletListenerRegistrationBean;
}
@Bean
public DataSource dataSource(@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
MyDataSource dataSource = new MyDataSource();
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
return dataSource;
}
@Autowired
private JdbcTemplate jdbcTemplate;
}
6.application.properties 配置
server.port=80
spring.datasource.url=jdbc:mysql://localhost:3306/aaaa?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
參考
基于Zookeeper動態(tài)切換數(shù)據(jù)源_BXS_0107的博客-CSDN博客
https://blog.csdn.net/newbie0107/article/details/105500579