Redisson源碼解讀-分布式鎖

前言Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid) 。Redisson有一樣功能是可重入的分布式鎖 。本文來討論一下這個功能的特點以及源碼分析 。
前置知識在講Redisson,咱們先來聊聊分布式鎖的特點以及Redis的發布/訂閱機制,磨刀不誤砍柴工 。
分布式鎖的思考首先思考下,如果我們自己去實現一個分布式鎖,這個鎖需要具備哪些功能?

  1. 互斥(這是一個鎖最基本的功能)
  2. 鎖失效機制(也就是可以設置鎖定時長,防止死鎖)
  3. 高性能、高可用
  4. 阻塞、非阻塞
  5. 可重入、公平鎖
  6. 。。。
可見,實現一個分布式鎖,需要考慮的東西有很多 。那么 , 如果用Redis來實現分布式鎖呢?如果只需要具備上面說的1、2點功能,要怎么寫?(ps:我就不寫了,自己想去)
Redis訂閱/發布機制Redisson中用到了Redis的訂閱/發布機制,下面簡單介紹下 。
簡單來說就是如果client2 、 client5 和 client1 訂閱了 channel1,當有消息發布到 channel1 的時候 , client2 、 client5 和 client1 都會收到這個消息 。
Redisson源碼解讀-分布式鎖

文章插圖
圖片來自 菜鳥教程-Redis發布訂閱
Redisson源碼版本:3.17.7
下面以Redisson官方的可重入同步鎖例子為入口,解讀下源碼 。
RLock lock = redisson.getLock("anyLock");// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);if (res) {try {...} finally {lock.unlock();}}加鎖我用時序圖來表示加鎖和訂閱的過程 。時序圖中括號后面的c1、c2代表client1,client2
Redisson源碼解讀-分布式鎖

文章插圖
當線程2獲取了鎖但還沒釋放鎖時,如果線程1去獲取鎖,會阻塞等待 , 直到線程2解鎖,通過Redis的發布訂閱機制喚醒線程1,再次去獲取鎖 。
加鎖方法是 lock.tryLock(100, 10, TimeUnit.SECONDS),對應著就是RedissonLock#tryLock
/** * 獲取鎖 * @param waitTime嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗 * @param leaseTime 鎖的持有時間,超過這個時間鎖會自動失效(值應設置為大于業務處理的時間 , 確保在鎖有效期內業務能處理完) * @param unit 時間單位 * @return 獲取鎖成功返回true , 失敗返回false */@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();// 當前時間long threadId = Thread.currentThread().getId();// 當前線程id// 嘗試加鎖,加鎖成功返回null,失敗返回鎖的剩余超時時間Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// 獲取鎖成功if (ttl == null) {return true;}// time小于0代表此時已經超過獲取鎖的等待時間,直接返回falsetime -= System.currentTimeMillis() - current;if (time <= 0) {// 沒看懂這個方法,里面里面空運行,有知道的大神還請不吝賜教acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);try {subscribeFuture.get(time, TimeUnit.MILLISECONDS);} catch (TimeoutException e) {if (!subscribeFuture.cancel(false)) {subscribeFuture.whenComplete((res, ex) -> {// 出現異常 , 取消訂閱if (ex == null) {unsubscribe(res, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;} catch (ExecutionException e) {acquireFailed(waitTime, unit, threadId);return false;}try {// 判斷是否超時(超過了waitTime)time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {// 再次獲取鎖,成功則返回long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 阻塞等待信號量喚醒或者超時,接收到訂閱時喚醒// 使用的是Semaphore#tryAcquire()currentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {// 因為是同步操作,所以無論加鎖成功或失敗 , 都取消訂閱unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);}//return get(tryLockAsync(waitTime, leaseTime, unit));}

推薦閱讀