實(shí)操Redission 分布式對(duì)象(一)

公共講解

RedissonClient(同步異步)、RedissonReactiveClient(反射式Reactive
)和RedissonRxClient(RxJava2)實(shí)例本身和Redisson提供的所有分布式對(duì)象都是線程安全的

1.首先先進(jìn)行一步簡(jiǎn)單的配置redis的步驟

Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer()
        .setAddress("redis://" + StringUtils.trim("10.6.30.41:6379"))
        .setDatabase(8)
        .setTimeout(3000)
        .setConnectionPoolSize(64)
        .setConnectionMinimumIdleSize(10);
    singleServerConfig.setPassword("123456");
RedissonClient client = Redisson.create(config);

2.了解常用RKeys的API操作

RKeys keys = client.getKeys();
//獲取所有key值
Iterable<String> allKeys = keys.getKeys();
//獲取所有模糊key值
Iterable<String> foundedKeys = keys.getKeysByPattern("key");
//刪除多個(gè)key值
long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");
//刪除模糊key值
long deletedKeysAmount = keys.deleteByPattern("test?");
//隨機(jī)獲取key
String randomKey = keys.randomKey();
//當(dāng)前多少key數(shù)
long keysAmount = keys.count();

3.通用對(duì)象桶(Object Bucket)

Redisson的分布式RBucketJava對(duì)象是一種通用對(duì)象桶可以用來存放任類型的對(duì)象。 除了同步接口外,還提供了異步(Async)、反射式(Reactive)和RxJava2標(biāo)準(zhǔn)的接口。

//對(duì)象桶 單個(gè)元素的存儲(chǔ)
RBucket<String> bucket = client.getBucket("REDIS_BUCKET_TEST");
//同步 存儲(chǔ)元素進(jìn)對(duì)象桶,無返回值
bucket.set("1");
//異步
bucket.setAsync("1");
//返回bucket中key的值,返回1
bucket.get();
//嘗試對(duì)REDIS_BUCKET_TEST存值,如果已存在值就不存進(jìn)去,存在就存進(jìn)去
// (先查詢是否存在,true則存入,false則返回)
boolean bl = bucket.trySet("2");
//第一個(gè)參數(shù)與存的值比較,相等就存入第二個(gè)參數(shù)
boolean bl = bucket.compareAndSet("2", "3");
//get返回并且存入新的值
String str =  bucket.getAndSet("4");

還可以通過RBuckets接口實(shí)現(xiàn)批量操作多個(gè)RBucket對(duì)象:

RBuckets buckets = client.getBuckets();
//獲取多個(gè)buckets
Map<String, String> loadedBuckets = buckets.get("REDIS_BUCKET_TEST", "REDIS_BUCKET_TEST1", "REDIS_BUCKET_TEST2");
Map<String, Object> map = new HashMap<>();
map.put("REDIS_BUCKET_TEST", "0");
map.put("REDIS_BUCKET_TEST1","1");
// 利用Redis的事務(wù)特性,同時(shí)保存所有的通用對(duì)象桶,如果任意一個(gè)通用對(duì)象桶已經(jīng)存在則放棄保存其他所有數(shù)據(jù)。
buckets.trySet(map);
// 同時(shí)保存全部通用對(duì)象桶。
buckets.set(map);

4.二進(jìn)制流(Binary Stream)

Redisson的分布式RBinaryStream Java對(duì)象同時(shí)提供了InputStream接口和OutputStream接口的實(shí)現(xiàn)。流的最大容量受Redis主節(jié)點(diǎn)的內(nèi)存大小限制。

RBinaryStream stream = client.getBinaryStream("anyStream");
byte[] content = "ceshi".getBytes();
//設(shè)置流內(nèi)容ceshi
stream.set(content);
InputStream is = stream.getInputStream();
byte[] readBuffer = new byte[512];
//讀取流ceshi
is.read(readBuffer);
OutputStream os = stream.getOutputStream();
byte[] contentToWrite = "ceshi1".getBytes();
//寫入流ceshi1,注意:不是覆蓋是寫入,最終流為ceshiceshi1
os.write(contentToWrite);

5.地理空間對(duì)象桶(Geospatial Bucket)

RGeo<String> geo = client.getGeo("test");
//同步存儲(chǔ)可存儲(chǔ)多個(gè)
geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"),
        new GeoEntry(15.087269, 37.502669, "Catania"));
//異步存儲(chǔ)只能一個(gè)
geo.addAsync(37.618423, 55.751244, "Moscow");
//返回兩個(gè)坐標(biāo)的距離
Double distance = geo.dist("Palermo", "Catania", GeoUnit.METERS);
//返回成員的地理位置
Map<String, GeoPosition> positions = geo.pos("test2", "Palermo", "test3", "Catania", "test1");
//暫時(shí)沒研究
RFuture<Map<String, String>> future= geo.hashAsync("Palermo", "Catania");
List<String> cities = geo.radius(15, 37, 200, GeoUnit.KILOMETERS);
Map<String, GeoPosition> citiesWithPositions = geo.radiusWithPosition(15, 37, 200, GeoUnit.KILOMETERS);

6.BitSet(暫時(shí)還不理解)

Redisson的分布式RBitSetJava對(duì)象采用了與java.util.BiteSet類似結(jié)構(gòu)的設(shè)計(jì)風(fēng)格??梢岳斫鉃樗且粋€(gè)分布式的可伸縮式位向量。需要注意的是RBitSet的大小受Redis限制,最大長(zhǎng)度為4 294 967 295。除了同步接口外,還提供了異步(Async)、反射式(Reactive)和RxJava2標(biāo)準(zhǔn)的接口。

RBitSet set = redisson.getBitSet("simpleBitset");
set.set(0, true);
set.set(1812, false);
set.clear(0);
set.addAsync("e");
set.xor("anotherBitset");

BitSet數(shù)據(jù)分片(Sharding)(分布式RoaringBitMap)

基于Redis的Redisson集群分布式BitSet通過RClusteredBitSet接口,為集群狀態(tài)下的Redis環(huán)境提供了BitSet數(shù)據(jù)分片的功能。通過優(yōu)化后更加有效的分布式RoaringBitMap算法,突破了原有的BitSet大小限制,達(dá)到了集群物理內(nèi)存容量大小。在這里可以獲取更多的內(nèi)部信息。

RClusteredBitSet set = redisson.getClusteredBitSet("simpleBitset");
set.set(0, true);
set.set(1812, false);
set.clear(0);
set.addAsync("e");
set.xor("anotherBitset");

7.原子整長(zhǎng)形(AtomicLong)

Redisson的分布式整長(zhǎng)形RAtomicLong對(duì)象和Java中的java.util.concurrent.atomic.AtomicLong對(duì)象類似。除了同步接口外,還提供了異步(Async)、反射式(Reactive)和RxJava2標(biāo)準(zhǔn)的接口。
個(gè)人理解:保證齊原子性,用了CAS算法樂觀鎖技術(shù),

RAtomicLong atomicLong = client.getAtomicLong("myAtomicLong");
//設(shè)置成3
atomicLong.set(3);
//原子地將當(dāng)前值增加1。
atomicLong.incrementAndGet();
//獲取值
atomicLong.get();

8.原子雙精度浮點(diǎn)(AtomicDouble)

Redisson還提供了分布式原子雙精度浮點(diǎn)RAtomicDouble,彌補(bǔ)了Java自身的不足。除了同步接口外,還提供了異步(Async)、反射式(Reactive)和RxJava2標(biāo)準(zhǔn)的接口。

RAtomicDouble atomicDouble = redisson.getAtomicDouble("myAtomicDouble");
//設(shè)置值
atomicDouble.set(2.81);
//加法并且返回加好后的值
atomicDouble.addAndGet(4.11);
//獲取值
atomicDouble.get();

9.話題(訂閱分發(fā))

Redisson的分布式話題RTopic對(duì)象實(shí)現(xiàn)了發(fā)布、訂閱的機(jī)制。除了同步接口外,還提供了異步(Async)、反射式(Reactive)和RxJava2標(biāo)準(zhǔn)的接口。

RTopic topic = redisson.getTopic("anyTopic");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
    @Override
    public void onMessage(String channel, SomeObject message) {
        //...
    }
});

// 在其他線程或JVM節(jié)點(diǎn)
RTopic topic = redisson.getTopic("anyTopic");
long clientsReceivedMessage = topic.publish(new SomeObject());

在Redis節(jié)點(diǎn)故障轉(zhuǎn)移(主從切換)或斷線重連以后,所有的話題監(jiān)聽器將自動(dòng)完成話題的重新訂閱。

模糊話題

Redisson的模糊話題RPatternTopic對(duì)象可以通過正式表達(dá)式來訂閱多個(gè)話題。除了同步接口外,還提供了異步(Async)、反射式(Reactive)和RxJava2標(biāo)準(zhǔn)的接口。

// 訂閱所有滿足`topic1.*`表達(dá)式的話題
RPatternTopic topic1 = redisson.getPatternTopic("topic1.*");
int listenerId = topic1.addListener(Message.class, new PatternMessageListener<Message>() {
    @Override
    public void onMessage(String pattern, String channel, Message msg) {
         Assert.fail();
    }
});

在Redis節(jié)點(diǎn)故障轉(zhuǎn)移(主從切換)或斷線重連以后,所有的模糊話題監(jiān)聽器將自動(dòng)完成話題的重新訂閱。

10.布隆過濾器(Bloom Filter)

Redisson利用Redis實(shí)現(xiàn)了Java分布式布隆過濾器(Bloom Filter)。所含最大比特?cái)?shù)量為2^32。

RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// 初始化布隆過濾器,預(yù)計(jì)統(tǒng)計(jì)元素?cái)?shù)量為55000000,期望誤差率為0.03
bloomFilter.tryInit(55000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));

6.8.1. 布隆過濾器數(shù)據(jù)分片(Sharding)

基于Redis的Redisson集群分布式布隆過濾器通過RClusteredBloomFilter接口,為集群狀態(tài)下的Redis環(huán)境提供了布隆過濾器數(shù)據(jù)分片的功能。 通過優(yōu)化后更加有效的算法,通過壓縮未使用的比特位來釋放集群內(nèi)存空間。每個(gè)對(duì)象的狀態(tài)都將被分布在整個(gè)集群中。所含最大比特?cái)?shù)量為2^64。在這里可以獲取更多的內(nèi)部信息。

RClusteredBloomFilter<SomeObject> bloomFilter = redisson.getClusteredBloomFilter("sample");
// 采用以下參數(shù)創(chuàng)建布隆過濾器
// expectedInsertions = 255000000
// falseProbability = 0.03
bloomFilter.tryInit(255000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));

該功能僅限于Redisson PRO版本。

11.基數(shù)估計(jì)算法(HyperLogLog)

實(shí)用場(chǎng)景:概率數(shù)據(jù)結(jié)構(gòu)使您可以以極高的空間效率維護(hù)數(shù)百萬個(gè)項(xiàng)目的計(jì)數(shù)。
Redisson利用Redis實(shí)現(xiàn)了Java分布式基數(shù)估計(jì)算法(HyperLogLog)對(duì)象。該對(duì)象可以在有限的空間內(nèi)通過概率算法統(tǒng)計(jì)大量的數(shù)據(jù)。除了同步接口外,還提供了異步(Async)、反射式(Reactive)和RxJava2標(biāo)準(zhǔn)的接口。

RHyperLogLog<Integer> log = redisson.getHyperLogLog("log");
log.add(1);
log.add(2);
log.add(3);

log.count();

12. 整長(zhǎng)型累加器(LongAdder)

基于Redis的Redisson分布式整長(zhǎng)型累加器(LongAdder)采用了與java.util.concurrent.atomic.LongAdder類似的接口。通過利用客戶端內(nèi)置的LongAdder對(duì)象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計(jì)其性能最高比分布式AtomicLong對(duì)象快 12000 倍。完美適用于分布式統(tǒng)計(jì)計(jì)量場(chǎng)景。

RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();

當(dāng)不再使用整長(zhǎng)型累加器對(duì)象的時(shí)候應(yīng)該自行手動(dòng)銷毀,如果Redisson對(duì)象被關(guān)閉(shutdown)了,則不用手動(dòng)銷毀。

RLongAdder atomicLong = ...
atomicLong.destroy();

13.雙精度浮點(diǎn)累加器(DoubleAdder)

基于Redis的Redisson分布式雙精度浮點(diǎn)累加器(DoubleAdder)采用了與java.util.concurrent.atomic.DoubleAdder類似的接口。通過利用客戶端內(nèi)置的DoubleAdder對(duì)象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計(jì)其性能最高比分布式AtomicDouble對(duì)象快 12000 倍。完美適用于分布式統(tǒng)計(jì)計(jì)量場(chǎng)景。

RLongDouble atomicDouble = redisson.getLongDouble("myLongDouble");
atomicDouble.add(12);
atomicDouble.increment();
atomicDouble.decrement();
atomicDouble.sum();

當(dāng)不再使用雙精度浮點(diǎn)累加器對(duì)象的時(shí)候應(yīng)該自行手動(dòng)銷毀,如果Redisson對(duì)象被關(guān)閉(shutdown)了,則不用手動(dòng)銷毀。

RLongDouble atomicDouble = ...

_b6d2063_
atomicDouble.destroy();

13.限流器(RateLimiter)

基于Redis的分布式限流器(RateLimiter)可以用來在分布式環(huán)境下現(xiàn)在請(qǐng)求方的調(diào)用頻率。既適用于不同Redisson實(shí)例下的多線程限流,也適用于相同Redisson實(shí)例下的多線程限流。該算法不保證公平性。除了同步接口外,還提供了異步(Async)、反射式(Reactive)和RxJava2標(biāo)準(zhǔn)的接口。

RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");
// 初始化
// 最大流速 = 每1秒鐘產(chǎn)生10個(gè)令牌
rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);

CountDownLatch latch = new CountDownLatch(2);
limiter.acquire(3);
// ...

Thread t = new Thread(() -> {
    limiter.acquire(2);
    // ...        
});
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容