目前分布式锁,比较成熟、主流的方案有基于redis及基于zookeeper的二种方案。

  大体来讲,基于redis的分布式锁核心指令为SETNX,即如果目标key存在,写入缓存失败返回0,反之如果目标key不存在,写入缓存成功返回1,通过区分这二个不同的返回值,可以认为SETNX成功即为获得了锁。

  redis分布式锁,看上去很简单,但其实要考虑周全,并不容易,网上有一篇文章讨论得很详细:http://blog.csdn.net/ugg/article/details/41894947/,有兴趣的可以阅读一下。

  其主要问题在于某些异常情况下,锁的释放会有问题,比如SETNX成功,应用获得锁,这时出于某种原因,比如网络中断,或程序出异常退出,会导致锁无法及时释放,只能依赖于缓存的过期时间,但是过期时间这个值设置多大,也是一个纠结的问题,设置小了,应用处理逻辑很复杂的话,可能会导致锁提前释放,如果设置大了,又会导致锁不能及时释放,所以那篇文章中针对这些细节讨论了很多。

  而基于zk的分布式锁,在锁的释放问题上处理起来要容易一些,其大体思路是利用zk的“临时顺序”节点,需要获取锁时,在某个约定节点下注册一个临时顺序节点,然后将所有临时节点按小从到大排序,如果自己注册的临时节点正好是最小的,表示获得了锁。(zk能保证临时节点序号始终递增,所以如果后面有其它应用也注册了临时节点,序号肯定比获取锁的应用更大)

  当应用处理完成,或者处理过程中出现某种原因,导致与zk断开,超过时间阈值(可配置)后,zk server端会自动删除该临时节点,即:锁被释放。所有参与锁竞争的应用,只要监听父路径的子节点变化即可,有变化时(即:有应用断开或注册时),开始抢锁,抢完了大家都在一边等着,直到有新变化时,开始新一轮抢锁。

  关于zk的分布式锁,网上也有一篇文章写得不错,见http://blog.csdn.net/desilting/article/details/41280869

个人感觉:zk做分布式锁机制更完善,但zk抗并发的能力弱于redis,性能上略差,建议如果并发要求高,锁竞争激烈,可考虑用redis,如果抢锁的频度不高,用zk更适合。

最后送福利时间到:

  文中提到的基于zk分布式锁的那篇文章,逻辑上虽然没有问题,但是有些场景下,锁的数量限制可能要求不止1个,比如:某些应用,我希望同时启动2个实例来处理,但是出于HA的考虑,又担心这二个实例会挂掉,这时可以启动4个(或者更多),这些实例中,只允许2个抢到锁的实例可以进行业务处理,其它实例处于standby状态(即:备胎),如果这二个抢到锁的实例挂了(比如异常退出),那么standby的实例会得到锁,即:备胎转正,开始正常业务处理,从而保证了系统的HA。

对于这些场景,我封装了一个抽象类,大家可在此基础上自行修改:(主要看明白思路就行,代码细节并不重要)

package cn.cnblogs.yjmyzz.zookeeper;

import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Created by yangjunming on 5/27/16.
 * 基于Zookeeper的分布式锁
 */
public abstract class AbstractLock {

    private int lockNumber = 1; //允许获取的锁数量(默认为1,即最小节点=自身时,认为获得锁)
    private ZkClient zk = null;
    private String rootNode = "/lock"; //根节点名称
    private String selfNode;
    private final String className = this.getClass().getSimpleName(); //当前实例的className
    private String selfNodeName;//自身注册的临时节点名
    private boolean handling = false;
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final JsonUtil jsonUtil = new FastJsonUtil();
    private static final String SPLIT = "/";
    private String selfNodeFullName;

    /**
     * 通过Zk获取分布式锁
     */
    protected void getLock(int lockNumber) {
        setLockNumber(lockNumber);
        initBean();
        initNode();
        subscribe();
        register();
        heartBeat();
        remainRunning();
    }

    protected void getLock() {
        getLock(1);
    }

    /**
     * 初始化结点
     */
    private void initNode() {

        String error;
        if (!rootNode.startsWith(SPLIT)) {
            error = "rootNode必须以" + SPLIT + "开头";
            logger.error(error);
            throw new RuntimeException(error);
        }

        if (rootNode.endsWith(SPLIT)) {
            error = "不能以" + SPLIT + "结尾";
            logger.error(error);
            throw new RuntimeException(error);
        }

        int start = 1;
        int index = rootNode.indexOf(SPLIT, start);
        String path;
        while (index != -1) {
            path = rootNode.substring(0, index);
            if (!zk.exists(path)) {
                zk.createPersistent(path);
            }
            start = index + 1;
            if (start >= rootNode.length()) {
                break;
            }
            index = rootNode.indexOf(SPLIT, start);
        }

        if (start < rootNode.length()) {
            if (!zk.exists(rootNode)) {
                zk.createPersistent(rootNode);
            }
        }

        selfNode = rootNode + SPLIT + className;

        if (!zk.exists(selfNode)) {
            zk.createPersistent(selfNode);
        }
    }

    /**
     * 向zk注册自身节点
     */
    private void register() {
        selfNodeName = zk.createEphemeralSequential(selfNode + SPLIT, StringUtils.EMPTY);
        if (!StringUtils.isEmpty(selfNodeName)) {
            selfNodeFullName = selfNodeName;
            logger.info("自身节点:" + selfNodeName + ",注册成功!");
            selfNodeName = selfNodeName.substring(selfNode.length() + 1);
        }
        checkMin();
    }

    /**
     * 订阅zk的节点变化
     */
    private void subscribe() {
        zk.subscribeChildChanges(selfNode, (parentPath, currentChilds) -> {
            checkMin();
        });
    }

    /**
     * 检测是否获得锁
     */
    private void checkMin() {
        List<String> list = zk.getChildren(selfNode);
        if (CollectionUtils.isEmpty(list)) {
            logger.error(selfNode + " 无任何子节点!");
            lockFail();
            handling = false;
            return;
        }
        //按序号从小到大排
        Collections.sort(list);

        //如果自身ID在前N个锁中,则认为获取成功
        int max = Math.min(getLockNumber(), list.size());
        for (int i = 0; i < max; i++) {
            if (list.get(i).equals(selfNodeName)) {
                if (!handling) {
                    lockSuccess();
                    handling = true;
                    logger.info("获得锁成功!");
                }
                return;
            }
        }

        int selfIndex = list.indexOf(selfNodeName);
        if (selfIndex > 0) {
            logger.info("前面还有节点" + list.get(selfIndex - 1) + ",获取锁失败!");
        } else {
            logger.info("获取锁失败!");
        }
        lockFail();

        handling = false;
    }

    /**
     * 获得锁成功的处理回调
     */
    protected abstract void lockSuccess();

    /**
     * 获得锁失败的处理回调
     */
    protected abstract void lockFail();

    /**
     * 初始化相关的Bean对象
     */
    protected abstract void initBean();

    protected void setZkClient(ZkClient zk) {
        this.zk = zk;
    }

    protected int getLockNumber() {
        return lockNumber;
    }

    protected void setLockNumber(int lockNumber) {
        this.lockNumber = lockNumber;
    }

    protected void setRootNode(String value) {
        this.rootNode = value;
    }

    /**
     * 防程序退出
     */
    private void remainRunning() {
        byte[] lock = new byte[0];
        synchronized (lock) {
            try {
                lock.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                logger.error("remainRunning出错:", e);
            }
        }
    }

    /**
     * 定时向zk发送心跳
     */
    private void heartBeat() {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        service.scheduleAtFixedRate(() -> {
            HeartBeat heartBeat = new HeartBeat();
            heartBeat.setHostIp(NetworkUtil.getHostAddress());
            heartBeat.setHostName(NetworkUtil.getHostName());
            heartBeat.setLastTime(new Date());
            heartBeat.setPid(RuntimeUtil.getPid());
            zk.writeData(selfNodeFullName, jsonUtil.toJson(heartBeat));
        }, 0, 15, TimeUnit.SECONDS);
    }
}

这个类中,提供了三个抽象方法:

    /**
     * 获得锁成功的处理回调
     */
    protected abstract void lockSuccess();

    /**
     * 获得锁失败的处理回调
     */
    protected abstract void lockFail();

    /**
     * 初始化相关的Bean对象
     */
    protected abstract void initBean();

用于处理抢锁成功、抢锁失败、及开抢前的一些对象初始化处理,子类继承后,只要实现这3个具体的方法即可,同时该抽象类默认还提供了心跳机制,用于定时向zk汇报自身的健康状态。

ZooKeeper 笔记(6) 分布式锁的更多相关文章

  1. 如何用Zookeeper来实现分布式锁?

    什么是Zookeeper临时顺序节点? 例如 : / 动物 植物 猫 仓鼠 荷花 松树 Zookeeper的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做Zonde.# Znode分为四种类型 ...

  2. 基于zookeeper实现的分布式锁

    基于zookeeper实现的分布式锁 2011-01-27 • 技术 • 7 条评论 • jiacheo •14,941 阅读 A distributed lock base on zookeeper ...

  3. 基于Zookeeper实现多进程分布式锁

    一.zookeeper简介及基本操作 Zookeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化.当对目录节点监控状态打开时,一旦目录节点的状态发生变化,Watc ...

  4. 利用ZooKeeper简单实现分布式锁

    1.分布式锁的由来: 在程序开发过程中不得不考虑的就是并发问题.在java中对于同一个jvm而言,jdk已经提供了lock和同步等.但是在分布式情况下,往往存在多个进程对一些资源产生竞争关系,而这些进 ...

  5. 基于zookeeper简单实现分布式锁

    https://blog.csdn.net/desilting/article/details/41280869 这里利用zookeeper的EPHEMERAL_SEQUENTIAL类型节点及watc ...

  6. 基于zookeeper实现高性能分布式锁

    实现原理:利用zookeeper的持久性节点和Watcher机制 具体步骤: 1.创建持久性节点 zkLock 2.在此父节点下创建子节点列表,name按顺序定义 3.Java程序获取该节点下的所有顺 ...

  7. ZooKeeper如何完成分布式锁?

    * 面试答案为LZ所写,如需转载请注明出处,谢谢. 1.最基本的思路: 将<local_ip>:<task_id>存在某个路径节点里. 刚开始并没有这个节点,当有executo ...

  8. Zookeeper--0300--java操作Zookeeper,临时节点实现分布式锁原理

    删除Zookeeper的java客户端有  : 1,Zookeeper官方提供的原生API, 2,zkClient,在原生api上进行扩展的开源java客户端 3, 一.Zookeeper原生API ...

  9. 基于ZooKeeper的分布式锁和队列

    在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper ...

随机推荐

  1. AOPR破解的密码复制的方法

    Advanced Office Password Recovery是一款office密码破解工具,简称AOPR.使用过Advanced Office Password Recovery的用户都知道成功 ...

  2. 您可能不曾注意的C++内置类型选择和使用的注意事项

    写在前面: 太忙了,好久没有写博客.这篇文章是在下读C++ Primer中文第五版(与以往版本相比,第五版的一大特色就是“为新的C++11标准重新撰写”——引自封皮)时的笔记,没有什么技术含量,只是作 ...

  3. 解决Tomcat7“At least one JAR was scanned for TLDs yet contained no TLDs”问题

    解决Tomcat7“At least one JAR was scanned for TLDs yet contained no TLDs”问题 2013-12-05 21:58:00|  分类: t ...

  4. 【Mocha.js 101】同步、异步与 Promise

    前情提要 在上一篇文章<[Mocha.js 101]Mocha 入门指南>中,我们提到了如何用 Mocha.js 进行前端自动化测试,并做了几个简单的例子来体验 Mocha.js 给我们带 ...

  5. oracle批量修改多个表的数据

    方法一 写PL/SQL,开cursor declare  l_varID varchar2(20);  l_varSubName varchar2(30);  cursor mycur is sele ...

  6. codeforces mysterious present 最长上升子序列+倒序打印路径

    link:http://codeforces.com/problemset/problem/4/D #include <iostream> #include <cstdio> ...

  7. ubuntu下安装Node.js(源码安装)

    最近使用hexo的过程中出现了问题,中间载nodejs安装的时候也耽误了些许时间,所以在此记录一下安装的过程. 环境:ubuntu14.0.4LTS,安装nodejs版本node-v0.10.36.t ...

  8. 关于scrollview监听的一些方法

    一 package cn.testscrollview; import android.os.Bundle; import android.view.MotionEvent; import andro ...

  9. 负载均衡LVS集群详解

     一.LB--负载均衡 在负载均衡集群中需要一个分发器,我们将其称之为Director,它位于多台服务器的上面的中间层,根据内部锁定义的规则或调度方式从下面的服务器群中选择一个以此来进行响应请求,而其 ...

  10. HW5.16

    public class Solution { public static void main(String[] args) { for(int i = 2000; i <= 2010; i++ ...