Redisson源碼解讀-分布式鎖( 三 )

value.getLatch().release() 也就是Semaphore#release  , 會喚醒Semaphore#tryAcquire阻塞的線程
解鎖上面我們聊了加鎖 , 本小節來聊下解鎖 。調用路徑如下
// RedissonLock#unlock// RedissonBaseLock#unlockAsync(long threadId)public RFuture<Void> unlockAsync(long threadId) {// 調用lua解鎖RFuture<Boolean> future = unlockInnerAsync(threadId);CompletionStage<Void> f = future.handle((opStatus, e) -> {// 取消看門狗cancelExpirationRenewal(threadId);if (e != null) {throw new CompletionException(e);}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);throw new CompletionException(cause);}return null;});return new CompletableFutureWrapper<>(f);}解鎖的邏輯不復雜,調用lua腳本解鎖以及取消看門狗 ??撮T狗晚點說,先說下lua解鎖
// RedissonLock#unlockInnerAsyncprotected RFuture<Boolean> unlockInnerAsync(long threadId) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}老規矩,分析下這段lua:

  1. 如果鎖不存在 , 返回null
  2. 鎖的值減1,如果鎖的值大于0(也就是可重入鎖仍然有加鎖次數),則重新設置過期時間
  3. 如果鎖的值小于等于0,這說明可以真正解鎖了,刪除鎖并通過發布訂閱機制發布解鎖消息
從 lua 中可以看到,解鎖時會發布消息到 channel 中,加鎖方法RedissonLock#tryLock中有相對應的訂閱操作 。
看門狗試想一個場景:程序執行需要10秒 , 程序執行完成才去解鎖,而鎖的存活時間只有5秒,也就是程序執行到一半的時候鎖就可以被其他程序獲取了 , 這顯然不合適 。那么怎么解決呢?
  1. 方式一:鎖永遠存在,直到解鎖 。不設置存活時間 。
    這種方法的弊端在于,如果程序沒解鎖就掛了,鎖就成了死鎖
  2. 方式二:依然設置鎖存活時間,但是監控程序的執行,如果程序還沒有執行完成 , 則定期給鎖續期 。
方式二就是Redisson的看門狗機制 。看門狗只有在沒有顯示指定鎖的持有時間(leaseTime)時才會生效 。
// RedissonLock#tryAcquireAsync// RedissonBaseLock#scheduleExpirationRenewalprotected void scheduleExpirationRenewal(long threadId) {// 創建ExpirationEntry,并放入EXPIRATION_RENEWAL_MAP中,下面的renewExpiration()方法會從map中再拿出來用ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);try {// 看門狗的具體邏輯renewExpiration();} finally {// 如果線程被中斷了 , 就取消看門狗if (Thread.currentThread().isInterrupted()) {// 取消看門狗cancelExpirationRenewal(threadId);}}}}scheduleExpirationRenewal 方法處理了ExpirationEntry和如果出現異常則取消看門狗,具體看門狗邏輯在 renewExpiration 方法中
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}// 創建延時任務,延時時間是internalLockLeaseTime / 3Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}// lua腳本判斷 , 如果鎖存在,則續期并返回true,不存在則返回falseCompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getRawName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// 鎖續期成功 , 則再啟動一個延時任務,繼續監測renewExpiration();} else {// 取消看門狗cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}Timeout 是一個延時任務 , 延時 internalLockLeaseTime / 3 時間執行 。任務的內容主要是通過 renewExpirationAsync 方法對鎖進行續期,如果續期失?。ń怄i了、鎖到期等),則取消看門狗,如果續期成功,則遞歸 renewExpiration 方法,繼續創建延時任務 。

推薦閱讀