Gemfire CQ 之持續(xù)查詢

站在巨人的肩膀上

  • ContinuousQuery(簡稱CQ):持續(xù)查詢,是指Client可以按照OQL(Object Query Language)查詢語句注冊自己感興趣的event,而這些event將發(fā)送給Client的Listener,一旦Server有event發(fā)生,就會將此event傳遞給Client。
  • 監(jiān)聽的事件類型:update create destroy
  • CQ查詢的特性:
    能夠使用標(biāo)準(zhǔn)的OQL語句
    對CQ事件進(jìn)行管理
    完全整合C/S架構(gòu)
    基于數(shù)據(jù)值的訂閱
    活躍查詢執(zhí)行

一個簡單的業(yè)務(wù)需求:

Client向Server訂閱監(jiān)聽年齡在15~35歲Customer之間的數(shù)據(jù):

Server

@SpringBootApplication
@CacheServerApplication(name = "GemFireContinuousQueryServer")
public class Application {

    @Bean(name = "Customers")
    PartitionedRegionFactoryBean<Long, Customer> customersRegion(GemFireCache gemfireCache) {
        PartitionedRegionFactoryBean<Long, Customer> customers = new PartitionedRegionFactoryBean<>();
        customers.setCache(gemfireCache);
        customers.setClose(false);
        customers.setPersistent(false);
        return customers;
    }
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

pom.xml 中的關(guān)鍵依賴:

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-gemfire</artifactId>
            <version>2.0.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.gj.demo</groupId>
            <artifactId>gemfire-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

Client

@SpringBootApplication
@ClientCacheApplication(name = "GemFireContinuousQueryClient", subscriptionEnabled = true)
@SuppressWarnings("unused")
public class Application {
    @Bean(name = "Customers")
    ClientRegionFactoryBean<Long, Customer> customersRegion(GemFireCache gemfireCache) {
        ClientRegionFactoryBean<Long, Customer> customers = new ClientRegionFactoryBean<>();
        customers.setCache(gemfireCache);
        customers.setClose(true);
        customers.setShortcut(ClientRegionShortcut.PROXY);
        return customers;
    }

    @Bean
    ContinuousQueryListenerContainer continuousQueryListenerContainer(GemFireCache gemfireCache) {
        Region<Long, Customer> customers = gemfireCache.getRegion("/Customers");
        ContinuousQueryListenerContainer container = new ContinuousQueryListenerContainer();
        container.setCache(gemfireCache);
        container.setQueryListeners(asSet(ageQueryDefinition(customers, 15,35)));
        return container;
    }
    private ContinuousQueryDefinition ageQueryDefinition(Region<Long, Customer> customers, int 
   ageFrom,int ageTo){
        String query = String.format("SELECT * FROM /Customers c WHERE c.getAge().intValue() > %d AND c.getAge().intValue() < %d ", ageFrom,ageTo);
        return new ContinuousQueryDefinition("Young Query ",query,newQueryListener(customers,"Young Query"));
    }

    private ContinuousQueryListener newQueryListener(Region<Long, Customer> customers, String qualifier) {
        return event -> {
            System.err.printf("new order!" + event.toString());
        };
    }
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Customer:

@Region
public class Customer implements Serializable {
    private static final long serialVersionUID = -3860687524824507124L;
    private String firstname, lastname;
    private int age;
    Long id;
//此處省略了get,set方法
@Override
    public String toString() {
        return String.format("%1$s %2$s %3$s", getFirstname(), getLastname(),getAge());
    }
}

程序跑起來~

分別啟動Server和Client,在瀏覽器傳入相關(guān)參數(shù):


server.png

查看Client的控制臺日志:


Client-Terminal.png

Client.png

至此,就是一個完整的Client向Server訂閱監(jiān)聽,收到訂閱消息的全過程。

CQ查詢的數(shù)據(jù)流

當(dāng)數(shù)據(jù)條目在服務(wù)器端更新時,新數(shù)據(jù)會經(jīng)過下面的步驟:
1.region條目發(fā)生變更
2.每一個事件,服務(wù)器的CQ處理框架檢查是否與運行的CQ匹配
3.如果數(shù)據(jù)條目的變更匹配了CQ查詢,CQ事件將被發(fā)送到客戶端上的CQ監(jiān)聽器,CQ監(jiān)聽器獲得此事件。


CQ stream.png

如上圖所示:
X條目新值和舊值都匹配了CQ查詢,因此查詢結(jié)果的更新的事件被發(fā)送出來。
Y條目舊值匹配了,但這是查詢結(jié)果的一部分,Y條目操作為失敗 ,因為查詢結(jié)果被銷毀的事件被發(fā)送出來。
Z條目為新創(chuàng),并不匹配CQ事件,所以事件不發(fā)送。

值得注意的是,CQ并不更新客戶端的Region,CQ作為CQ監(jiān)聽器的通告工具而服務(wù),CQ監(jiān)聽器可以按照客戶應(yīng)用的要求任意編程。

當(dāng)一個CQ運行在服務(wù)器Region的時候,每一個Server條目更新線程都放在CQ查詢中,如果old value或者new value 滿足查詢條件,線程將放到CqEvent的Client隊列中去,一旦Client接受了此事件,CqEvent將被傳遞到CqListeners的onEvent方法上,如下圖所示:


CQ.png

QueryService 接口提供的方法

  • create a new CQ and specify whether it is durable

  • execute a CQ,with or without an initial set

  • list all the CQs registered by the client

  • close and stop CQs at the cache and region level

  • get a handle on CqStatistics for the client

  • CqQuery:管理持續(xù)查詢的方法,通過QueryService 創(chuàng)建,用于開啟和停止CQ執(zhí)行,同時查詢其他與CQ想關(guān)聯(lián)的對象,such as CQ屬性,CQ統(tǒng)計和CQ狀態(tài)。

  • CqListener:用于處理持續(xù)查詢的事件。
    *CqEvent:提供了從Server發(fā)送的所有的CQ事件信息,此事件被傳遞到CqListener的onEvent方法。

程序媛小白一枚,如有錯誤,煩請批評指正!(#.#)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容