zk系列三:zookeeper實戰之分布式鎖實現

一、分布式鎖的通用實現思路分布式鎖的概念以及常規解決方案可以參考之前的博客:聊聊分布式鎖的解決方案;今天我們先分析下分布式鎖的實現思路;

  • 首先,需要保證唯一性 , 即某一時點只能有一個線程訪問某一資源;比方說待辦短信通知功能 , 每天早上九點短信提醒所有工單的處理人處理工單,假設服務部署了20個容器,那么早上九點的時候會有20個線程啟動準備發送短信,此時我們只能讓一個線程執行短信發送 , 否則用戶會收到20條相同的短信;
  • 其次,需要考慮下何時應該釋放鎖?這又分三種情況,一是拿到鎖的線程正常結束 , 另一種是獲取鎖的線程異常退出,還有種是獲取鎖的線程一直阻塞;第一種情況直接釋放即可,第二種情況可以通過定義下鎖的過期時間然后通過定時任務去釋放鎖;zk的話直接通過臨時節點即可;最后一種阻塞的情況也可以通過定時任務來釋放,但是需要根據業務來綜合判斷 , 如果業務本身就是長時間耗時的操作那么鎖的過期時間就得設置的久一點
  • 最后 , 當拿到鎖的線程釋放鎖的時候,如何通知其他線程可以搶鎖了呢這里簡單介紹兩種解決方案,一種是所有需要鎖的線程主動輪詢,固定時間去訪問下看鎖是否釋放,但是這種方案無端增加服務器壓力并且時效性無法保證;另一種就是zk的watch , 監聽鎖所在的目錄 , 一有變化立馬得到通知
二、ZK實現分布式鎖的思路
  • zk通過每個線程在同一父目錄下創建臨時有序節點,然后通過比較節點的id大小來實現分布式鎖功能;再通過zk的watch機制實時獲取節點的狀態,如果被刪除立即重新爭搶鎖;具體流程見線圖:
    zk系列三:zookeeper實戰之分布式鎖實現

    文章插圖
提示:需要關注下圖里判斷自身不是最小節點時的監聽情況,為什么不監聽父節點?原因圖里已有描述,這里就不再贅述
三、ZK實現分布式鎖的編碼實現1、核心工具類實現通過不斷的調試 , 我封裝了一個ZkLockHelper類,里面封裝了上鎖和釋放鎖的方法 , 為了方便我將zk的一些監聽和回調機智也融合到一起了,并沒有抽出來,下面貼上該類的全部代碼
package com.darling.service.zookeeper.lock;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import org.junit.platform.commons.util.StringUtils;import java.util.Collections;import java.util.List;import java.util.Objects;import java.util.concurrent.CountDownLatch;/** * @description: * @author: dll * @date: Created in 2022/11/4 8:41 * @version: * @modified By: */@Data@Slf4jpublic class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback {private final String lockPath = "/lockItem";ZooKeeper zkClient;String threadName;CountDownLatch cd = new CountDownLatch(1);private String pathName;/*** 上鎖*/public void tryLock() {try {log.info("線程:{}正在創建節點",threadName);zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"AAA");log.info("線程:{}正在阻塞......",threadName);// 由于上面是異步創建所以這里需要阻塞住當前線程cd.await();} catch (InterruptedException e) {e.printStackTrace();}}/*** 釋放鎖*/public void unLock() {try {zkClient.delete(pathName,-1);System.out.println(threadName + " 工作結束....");} catch (Exception e) {e.printStackTrace();}}/*** create方法的回調,創建成功后在此處獲取/DCSLock的子目錄,比較節點ID是否最小 , 是則拿到鎖 。。。* @param rc狀態碼* @param pathcreate方法的path入參* @param ctxcreate方法的上下文入參* @param name創建成功的臨時有序節點的名稱 , 即在path的后面加上了zk維護的自增ID;*注意如果創建的不是有序節點,那么此處的name和path的內容一致*/@Overridepublic void processResult(int rc, String path, Object ctx, String name) {log.info(">>>>>>>>>>>>>>>>>processResult,rx:{},path:{},ctx:{},name:{}",rc,path,ctx.toString(),name);if (StringUtils.isNotBlank(name)) {try {pathName =name ;// 此處path需注意要寫/zkClient.getChildren("/", false,this,"123");//List<String> children = zkClient.getChildren("/", false);//log.info(">>>>>threadName:{},children:{}",threadName,children);//// 給children排序//Collections.sort(children);//int i = children.indexOf(pathName.substring(1));//// 判斷自身是否第一個//if (Objects.equals(i,0)) {//// 是第一個則表示搶到了鎖//log.info("線程{}搶到了鎖",threadName);//cd.countDown();//}else {//// 表示沒搶到鎖//log.info("線程{}搶鎖失敗 , 重新注冊監聽器",threadName);//zkClient.exists("/"+children.get(i-1),this,this,"AAA");//}} catch (Exception e) {e.printStackTrace();}}}/*** exists方法的回調,此處暫不做處理* @param rc* @param path* @param ctx* @param stat*/@Overridepublic void processResult(int rc, String path, Object ctx, Stat stat) {}/*** exists的watch監聽* @param event*/@Overridepublic void process(WatchedEvent event) {//如果第一個線程鎖釋放了,等價于第一個線程刪除了節點,此時只有第二個線程會監控的到switch (event.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zkClient.getChildren("/", false,this,"123");//// 此處path需注意要寫"/"http://List<String> children = null;//try {//children = zkClient.getChildren("/", false);//} catch (KeeperException e) {//e.printStackTrace();//} catch (InterruptedException e) {//e.printStackTrace();//}//log.info(">>>>>threadName:{},children:{}",threadName,children);//// 給children排序//Collections.sort(children);//int i = children.indexOf(pathName.substring(1));//// 判斷自身是否第一個//if (Objects.equals(i,0)) {//// 是第一個則表示搶到了鎖//log.info("線程{}搶到了鎖",threadName);//cd.countDown();//}else {///**//*表示沒搶到鎖;需要判斷前置節點存不存在,其實這里并不是特別關心前置節點存不存在,所以其回調可以不處理;//*但是這里關注的前置節點的監聽,當前置節點監聽到被刪除時就是其他線程搶鎖之時//*///zkClient.exists("/"+children.get(i-1),this,this,"AAA");//}break;case NodeDataChanged:break;case NodeChildrenChanged:break;}}/*** getChildren方法的回調* @param rc* @param path* @param ctx* @param children*/@Overridepublic void processResult(int rc, String path, Object ctx, List<String> children) {try {log.info(">>>>>threadName:{},children:{}", threadName, children);if (Objects.isNull(children)) {return;}// 給children排序Collections.sort(children);int i = children.indexOf(pathName.substring(1));// 判斷自身是否第一個if (Objects.equals(i, 0)) {// 是第一個則表示搶到了鎖log.info("線程{}搶到了鎖", threadName);cd.countDown();} else {// 表示沒搶到鎖log.info("線程{}搶鎖失敗,重新注冊監聽器", threadName);/***表示沒搶到鎖;需要判斷前置節點存不存在,其實這里并不是特別關心前置節點存不存在,所以其回調可以不處理;*但是這里關注的前置節點的監聽,當前置節點監聽到被刪除時就是其他線程搶鎖之時*/zkClient.exists("/" + children.get(i - 1), this, this, "AAA");}} catch (Exception e) {e.printStackTrace();}}}

推薦閱讀