Redisson源碼解讀-分布式鎖( 二 )

先看一下整體邏輯:

  1. 嘗試加鎖 , 成功直接返回true
  2. 判斷超時
  3. 訂閱
  4. 判斷超時
  5. 循環 (嘗試獲取鎖 → 判斷超時 → 阻塞等待 )
tryLock方法看著很長,但是有很多代碼都是重復的,本小節重點說一下嘗試加鎖的方法tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {// 調用lua腳本 , 嘗試加鎖ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 這里的if、else的區別就在于,如果沒有設置leaseTime,就使用默認的internalLockLeaseTime(默認30秒)ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquired// 如果ttlRemaining為空 , 也就是tryLockInnerAsync方法中的lua執行結果返回空 , 證明獲取鎖成功if (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 如果沒有設置鎖的持有時間(leaseTime),則啟動看門狗,定時給鎖續期 , 防止業務邏輯未執行完成鎖就過期了scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}tryAcquireAsync方法中 , 主要分為兩段邏輯:
  1. 調用lua腳本加鎖:tryLockInnerAsync
  2. 看門狗:scheduleExpirationRenewal
看門狗在后面講,本小節重點還是在加鎖
// RedissonLock#tryLockInnerAsync<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}Redisson使用了 Hash 結構來表示一個鎖,這樣 Hash 里面的 key 為線程id,value 為鎖的次數 。這樣巧妙地解決了可重入鎖的問題 。
下面我們來分析下這段 lua 腳本的邏輯(下面說的threadId都是指變量,不是說key就叫’threadId’):
  1. 如果鎖(hash結構)不存在,則創建 , 并添加一個鍵值對 (threadId : 1),并設置鎖的過期時間
  2. 如果鎖存在,則將鍵值對 threadId 對應的值 + 1 , 并設置鎖的過期時間
  3. 如果不如何1,2點,則返回鎖的剩余過期時間
訂閱讓我們把視線重新回到RedissonLock#tryLock中 , 當經過一些嘗試獲取鎖,超時判斷之后,代碼來到while循環中 。這個while循環是個死循環,只有成功獲取鎖或者超時,才會退出 。一般死循環的設計中,都會有阻塞等待的代碼,否則如果循環中的邏輯短時間拿不到結果 , 會造成資源搶占和浪費 。阻塞代碼就是下面這段
if (ttl >= 0 && ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}commandExecutor.getNow(subscribeFuture).getLatch() 得到的是一個Semaphore信號量對象,這是jdk的內置對象,Semaphore#tryAcquire表示阻塞并等待喚醒 。那么信號量什么時候被喚醒呢?在訂閱方法中RedissonLock#subscribe 。訂閱方法的邏輯也不少 , 咱們直接講其最終調用的處理方法
// LockPubSub#onMessageprotected void onMessage(RedissonLockEntry value, Long message) {// 普通的解鎖走的是這個if (message.equals(UNLOCK_MESSAGE)) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}// 這里就是喚醒信號量的地方value.getLatch().release();// 這個是讀寫鎖用的} else if (message.equals(READ_UNLOCK_MESSAGE)) {while (true) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute == null) {break;}runnableToExecute.run();}value.getLatch().release(value.getLatch().getQueueLength());}}

推薦閱讀