站在巨人的肩膀上
- ContinuousQuery(簡稱CQ):持續(xù)查詢,是指Client可以按照OQL(Object Query Language)查詢語句注冊自己感興趣的event,而這些event將發(fā)送給Client的Listener,一旦Server有event發(fā)生,就會將此event傳遞給Client。
- 監(jiān)聽的事件類型:update create destroy
- CQ查詢的特性:
能夠使用標(biāo)準(zhǔn)的OQL語句
對CQ事件進(jìn)行管理
完全整合C/S架構(gòu)
基于數(shù)據(jù)值的訂閱
活躍查詢執(zhí)行
一個簡單的業(yè)務(wù)需求:
Client向Server訂閱監(jiān)聽年齡在15~35歲Customer之間的數(shù)據(jù):
Server
@SpringBootApplication
@CacheServerApplication(name = "GemFireContinuousQueryServer")
public class Application {
@Bean(name = "Customers")
PartitionedRegionFactoryBean<Long, Customer> customersRegion(GemFireCache gemfireCache) {
PartitionedRegionFactoryBean<Long, Customer> customers = new PartitionedRegionFactoryBean<>();
customers.setCache(gemfireCache);
customers.setClose(false);
customers.setPersistent(false);
return customers;
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
pom.xml 中的關(guān)鍵依賴:
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-gemfire</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.gj.demo</groupId>
<artifactId>gemfire-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
Client
@SpringBootApplication
@ClientCacheApplication(name = "GemFireContinuousQueryClient", subscriptionEnabled = true)
@SuppressWarnings("unused")
public class Application {
@Bean(name = "Customers")
ClientRegionFactoryBean<Long, Customer> customersRegion(GemFireCache gemfireCache) {
ClientRegionFactoryBean<Long, Customer> customers = new ClientRegionFactoryBean<>();
customers.setCache(gemfireCache);
customers.setClose(true);
customers.setShortcut(ClientRegionShortcut.PROXY);
return customers;
}
@Bean
ContinuousQueryListenerContainer continuousQueryListenerContainer(GemFireCache gemfireCache) {
Region<Long, Customer> customers = gemfireCache.getRegion("/Customers");
ContinuousQueryListenerContainer container = new ContinuousQueryListenerContainer();
container.setCache(gemfireCache);
container.setQueryListeners(asSet(ageQueryDefinition(customers, 15,35)));
return container;
}
private ContinuousQueryDefinition ageQueryDefinition(Region<Long, Customer> customers, int
ageFrom,int ageTo){
String query = String.format("SELECT * FROM /Customers c WHERE c.getAge().intValue() > %d AND c.getAge().intValue() < %d ", ageFrom,ageTo);
return new ContinuousQueryDefinition("Young Query ",query,newQueryListener(customers,"Young Query"));
}
private ContinuousQueryListener newQueryListener(Region<Long, Customer> customers, String qualifier) {
return event -> {
System.err.printf("new order!" + event.toString());
};
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Customer:
@Region
public class Customer implements Serializable {
private static final long serialVersionUID = -3860687524824507124L;
private String firstname, lastname;
private int age;
Long id;
//此處省略了get,set方法
@Override
public String toString() {
return String.format("%1$s %2$s %3$s", getFirstname(), getLastname(),getAge());
}
}
程序跑起來~
分別啟動Server和Client,在瀏覽器傳入相關(guān)參數(shù):

查看Client的控制臺日志:


至此,就是一個完整的Client向Server訂閱監(jiān)聽,收到訂閱消息的全過程。
CQ查詢的數(shù)據(jù)流
當(dāng)數(shù)據(jù)條目在服務(wù)器端更新時,新數(shù)據(jù)會經(jīng)過下面的步驟:
1.region條目發(fā)生變更
2.每一個事件,服務(wù)器的CQ處理框架檢查是否與運行的CQ匹配
3.如果數(shù)據(jù)條目的變更匹配了CQ查詢,CQ事件將被發(fā)送到客戶端上的CQ監(jiān)聽器,CQ監(jiān)聽器獲得此事件。

如上圖所示:
X條目新值和舊值都匹配了CQ查詢,因此查詢結(jié)果的更新的事件被發(fā)送出來。
Y條目舊值匹配了,但這是查詢結(jié)果的一部分,Y條目操作為失敗 ,因為查詢結(jié)果被銷毀的事件被發(fā)送出來。
Z條目為新創(chuàng),并不匹配CQ事件,所以事件不發(fā)送。
值得注意的是,CQ并不更新客戶端的Region,CQ作為CQ監(jiān)聽器的通告工具而服務(wù),CQ監(jiān)聽器可以按照客戶應(yīng)用的要求任意編程。
當(dāng)一個CQ運行在服務(wù)器Region的時候,每一個Server條目更新線程都放在CQ查詢中,如果old value或者new value 滿足查詢條件,線程將放到CqEvent的Client隊列中去,一旦Client接受了此事件,CqEvent將被傳遞到CqListeners的onEvent方法上,如下圖所示:

QueryService 接口提供的方法
create a new CQ and specify whether it is durable
execute a CQ,with or without an initial set
list all the CQs registered by the client
close and stop CQs at the cache and region level
get a handle on CqStatistics for the client
CqQuery:管理持續(xù)查詢的方法,通過QueryService 創(chuàng)建,用于開啟和停止CQ執(zhí)行,同時查詢其他與CQ想關(guān)聯(lián)的對象,such as CQ屬性,CQ統(tǒng)計和CQ狀態(tài)。
CqListener:用于處理持續(xù)查詢的事件。
*CqEvent:提供了從Server發(fā)送的所有的CQ事件信息,此事件被傳遞到CqListener的onEvent方法。
程序媛小白一枚,如有錯誤,煩請批評指正!(#.#)