SpringBoot整合Redisson

Redisson的Github地址:https://github.com/redisson/redisson/wiki/Table-of-Content

1、添加依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.4.1</version>
            <exclusions>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.13.6</version>
        </dependency>

2、新建配置類

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

/**
 * @author ZhaoBW
 * @version 1.0
 * @date 2021/1/25 14:24
 */
@Configuration
public class MyRedissonConfig {

    @Bean(destroyMethod="shutdown")
    RedissonClient redisson() throws IOException {
        //1、創(chuàng)建配置
        Config config = new Config();
        config.useSingleServer()
                .setAddress("192.168.43.129:6379");
        return Redisson.create(config);
    }
    
}

3、分布式鎖

3.1、可重入鎖

基于Redis的Redisson分布式可重入鎖RLock對(duì)象實(shí)現(xiàn)了java.util.concurrent.locks.Lock接口。

    @RequestMapping("/redisson")
    public String testRedisson(){
        //獲取分布式鎖,只要鎖的名字一樣,就是同一把鎖
        RLock lock = redissonClient.getLock("lock");

        //加鎖(阻塞等待),默認(rèn)過(guò)期時(shí)間是30秒
        lock.lock();
        try{
            //如果業(yè)務(wù)執(zhí)行過(guò)長(zhǎng),Redisson會(huì)自動(dòng)給鎖續(xù)期
            Thread.sleep(1000);
            System.out.println("加鎖成功,執(zhí)行業(yè)務(wù)邏輯");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //解鎖,如果業(yè)務(wù)執(zhí)行完成,就不會(huì)繼續(xù)續(xù)期,即使沒有手動(dòng)釋放鎖,在30秒過(guò)后,也會(huì)釋放鎖
            lock.unlock();
        }

        return "Hello Redisson!";
    }

大家都知道,如果負(fù)責(zé)儲(chǔ)存這個(gè)分布式鎖的Redisson節(jié)點(diǎn)宕機(jī)以后,而且這個(gè)鎖正好處于鎖住的狀態(tài)時(shí),這個(gè)鎖會(huì)出現(xiàn)鎖死的狀態(tài)。為了避免這種情況的發(fā)生,Redisson內(nèi)部提供了一個(gè)監(jiān)控鎖的看門狗,它的作用是在Redisson實(shí)例被關(guān)閉前,不斷的延長(zhǎng)鎖的有效期。默認(rèn)情況下,看門狗的檢查鎖的超時(shí)時(shí)間是30秒鐘,也可以通過(guò)修改Config.lockWatchdogTimeout來(lái)另行指定。

在RedissonLock類的renewExpiration()方法中,會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù)每隔30/3=10秒給鎖續(xù)期。如果業(yè)務(wù)執(zhí)行期間,應(yīng)用掛了,那么不會(huì)自動(dòng)續(xù)期,到過(guò)期時(shí)間之后,鎖會(huì)自動(dòng)釋放。

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); 
        ee.setTimeout(task);
    }

另外Redisson還提供了leaseTime的參數(shù)來(lái)指定加鎖的時(shí)間。超過(guò)這個(gè)時(shí)間后鎖便自動(dòng)解開了。

// 加鎖以后10秒鐘自動(dòng)解鎖
// 無(wú)需調(diào)用unlock方法手動(dòng)解鎖
lock.lock(10, TimeUnit.SECONDS);

// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動(dòng)解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);

如果指定了鎖的超時(shí)時(shí)間,底層直接調(diào)用lua腳本,進(jìn)行占鎖。如果超過(guò)leaseTime,業(yè)務(wù)邏輯還沒有執(zhí)行完成,則直接釋放鎖,所以在指定leaseTime時(shí),要讓leaseTime大于業(yè)務(wù)執(zhí)行時(shí)間。RedissonLock類的tryLockInnerAsync()方法

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
3.2、讀寫鎖

分布式可重入讀寫鎖允許同時(shí)有多個(gè)讀鎖和一個(gè)寫鎖處于加鎖狀態(tài)。在讀寫鎖中,讀讀共享、讀寫互斥、寫寫互斥。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

讀寫鎖測(cè)試類,當(dāng)訪問(wèn)write接口時(shí),read接口會(huì)被阻塞住。

import org.redisson.api.RCountDownLatch;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;

/**
 * @author ZhaoBW
 * @version 1.0
 * @date 2021/1/23 20:12
 */
@RestController
public class TestController {

    @Autowired
    RedissonClient redissonClient;

    @Autowired
    StringRedisTemplate redisTemplate;

    @RequestMapping("/write")
    public String write(){
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
        RLock writeLock = readWriteLock.writeLock();
        String s = UUID.randomUUID().toString();
        writeLock.lock();
        try {
            redisTemplate.opsForValue().set("wr-lock-key", s);
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            writeLock.unlock();
        }
        return s;
    }

    @RequestMapping("/read")
    public String read(){
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
        RLock readLock = readWriteLock.readLock();
        String s = "";
        readLock.lock();
        try {
            s = redisTemplate.opsForValue().get("wr-lock-key");
        } finally {
            readLock.unlock();
        }
        return s;
    }
}
3.3、信號(hào)量(Semaphore)

Redisson的分布式信號(hào)量與的用法與java.util.concurrent.Semaphore相似

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();

現(xiàn)在redis中保存semaphore的值為3


Redisson1.jpg

然后在TestController中添加測(cè)試方法:

    @RequestMapping("/releaseSemaphore")
    public String releaseSemaphore(){
        RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
        semaphore.release();
        return "release success";
    }

    @RequestMapping("/acquireSemaphore")
    public String acquireSemaphore() throws InterruptedException {
        RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
        semaphore.acquire();
        return "acquire success";
    }

當(dāng)訪問(wèn)acquireSemaphore接口時(shí),redis中的semaphore會(huì)減1;訪問(wèn)releaseSemaphore接口時(shí),redis中的semaphore會(huì)加1。當(dāng)redis中的semaphore為0時(shí),繼續(xù)訪問(wèn)acquireSemaphore接口,會(huì)被阻塞,直到訪問(wèn)releaseSemaphore接口,使得semaphore>0,acquireSemaphore才會(huì)繼續(xù)執(zhí)行。

3.4、閉鎖(CountDownLatch)

CountDownLatch作用:某一線程,等待其他線程執(zhí)行完畢之后,自己再繼續(xù)執(zhí)行。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他線程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

在TestController中添加測(cè)試方法,訪問(wèn)close接口時(shí),調(diào)用await()方法進(jìn)入阻塞狀態(tài),直到有三次訪問(wèn)release接口時(shí),close接口才會(huì)返回。

    @RequestMapping("/close")
    public String close() throws InterruptedException {
        RCountDownLatch close = redissonClient.getCountDownLatch("close");
        close.trySetCount(3);
        close.await();
        return "close";
    }

    @RequestMapping("/release")
    public String release(){
        RCountDownLatch close = redissonClient.getCountDownLatch("close");
        close.countDown();
        return "release";
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容