分布式锁实现--基于zookeeper和redis的两种方案
前言:
最近项目中需要一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行。查阅相关资料总结了一下实现分布式锁的两种实现方案。
分布式锁的由来:
在程序开发过程中不得不考虑的就是并发问题。在java中对于同一个jvm而言,jdk已经提供了lock和同步等。但是在分布式情况下,往往存在多个进程对一些资源产生竞争关系,而这些进程往往在不同的机器上,这个时候jdk中提供的已经不能满足。分布式锁顾明思议就是可以满足分布式情况下的并发锁。
方案一:基于zookeeper实现分布式锁
基于ZooKeeper分布式锁的流程
- 在zookeeper指定节点(locks)下创建临时顺序节点node_n
- 获取locks下所有子节点children
- 对子节点按节点自增序号从小到大排序
- 判断本节点是不是第一个子节点,若是,则获取锁;若不是,则监听比该节点小的那个节点的删除事件
- 若监听事件生效,则回到第二步重新进行判断,直到获取到锁
流程图:
具体实现
下面就具体使用java和zookeeper实现分布式锁,操作zookeeper使用的是apache提供的zookeeper的包。
- 通过实现Watch接口,实现process(WatchedEvent event)方法来实施监控,使CountDownLatch来完成监控,在等待锁的时候使用CountDownLatch来计数,等到后进行countDown,停止等待,继续运行。
- 以下整体流程基本与上述描述流程一致,只是在监听的时候使用的是CountDownLatch来监听前一个节点
实现代码:
public class DistributedLock implements Lock, Watcher { private ZooKeeper zk = null; // 根节点 private String ROOT_LOCK = "/locks"; // 竞争的资源 private String lockName; // 等待的前一个锁 private String WAIT_LOCK; // 当前锁 private String CURRENT_LOCK; // 计数器 private CountDownLatch countDownLatch; private int sessionTimeout = 30000; private List<Exception> exceptionList = new ArrayList<Exception>(); /** * 配置分布式锁 * @param config 连接的url * @param lockName 竞争资源 */ public DistributedLock(String config, String lockName) { this.lockName = lockName; try { // 连接zookeeper zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(ROOT_LOCK, false); if (stat == null) { // 如果根节点不存在,则创建根节点 zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } // 节点监视器 @Override public void process(WatchedEvent event) { if (this.countDownLatch != null) { this.countDownLatch.countDown(); } } public void lock() { if (exceptionList.size() > 0) { throw new LockException(exceptionList.get(0)); } try { if (this.tryLock()) { System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁"); return; } else { // 等待锁 waitForLock(WAIT_LOCK, sessionTimeout); } } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public boolean tryLock() { try { String splitStr = "_lock_"; if (lockName.contains(splitStr)) { throw new LockException("锁名有误"); } // 创建临时有序节点 CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(CURRENT_LOCK + " 已经创建"); // 取所有子节点 List<String> subNodes = zk.getChildren(ROOT_LOCK, false); // 取出所有lockName的锁 List<String> lockObjects = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if (_node.equals(lockName)) { lockObjects.add(node); } } Collections.sort(lockObjects); System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK); // 若当前节点为最小节点,则获取锁成功 if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) { return true; } // 若不是最小节点,则找到自己的前一个节点 String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1); WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } return false; } public boolean tryLock(long timeout, TimeUnit unit) { try { if (this.tryLock()) { return true; } return waitForLock(WAIT_LOCK, timeout); } catch (Exception e) { e.printStackTrace(); } return false; } // 等待锁 private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException { Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true); if (stat != null) { System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/" + prev); this.countDownLatch = new CountDownLatch(1); // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁 this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); this.countDownLatch = null; System.out.println(Thread.currentThread().getName() + " 等到了锁"); } return true; } public void unlock() { try { System.out.println("释放锁 " + CURRENT_LOCK); zk.delete(CURRENT_LOCK, -1); CURRENT_LOCK = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public Condition newCondition() { return null; } public void lockInterruptibly() throws InterruptedException { this.lock(); } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } } } |
方案二:基于redis实现分布式锁
话不多说了直接上例子吧!
mport org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import java.io.Serializable; import java.util.concurrent.TimeUnit; /** * @author yekeping * @Title: ${file_name} * @Package ${package_name} * @Description: ${todo} * @date 2018/7/59:25 */ public class DistributedLock{ private String regionName; private Long expire = 0l; protected RedisTemplate<Serializable, Serializable> redisTemplate; private Serializable key(Serializable key) { if(regionName != null && regionName.length() > 0){ return regionName + "." + String.valueOf(key); } return key; } private ValueOperations<Serializable, Serializable> opsForValue(){ return redisTemplate.opsForValue(); } public void set(Serializable key, Serializable value) { if (expire != null && expire > 0) { opsForValue().set(key(key), value, expire, TimeUnit.SECONDS); } else { opsForValue().set(key(key), value); } } public void set(Serializable key, Serializable value, Long _expire, TimeUnit expireTimeUnit) { if (_expire == null || _expire <= 0) { set(key, value); } else { redisTemplate.opsForValue().set(key(key), value, _expire, expireTimeUnit); } } public Serializable get(Serializable key) { return redisTemplate.opsForValue().get(key(key)); } public void delete(Serializable key) { redisTemplate.delete(key(key)); } public Serializable getAndSet(Serializable key, Serializable newValue) { return opsForValue().getAndSet(key(key), newValue); } public boolean setNotExist(Serializable key, Serializable value) { return opsForValue().setIfAbsent(key(key), value); } public boolean lock(Serializable lockKey, Long expireTimeOut, TimeUnit expireTimeUnit, Long lockTimeOut,TimeUnit lockTimeUnit) { long start = System.currentTimeMillis(); long timeout = lockTimeUnit.toMillis(lockTimeOut); // if !useTimeout, boolean useTimeout = timeout > 0; while (useTimeout ? isTimeout(start, timeout) : true) { Serializable lockExpireTime = System.currentTimeMillis() + expireTimeUnit.toMillis(expireTimeOut) + 1;// 锁超时时间 if (setNotExist(lockKey, lockExpireTime)) { // 获取到锁 return true; } Serializable value = get(lockKey); if (value != null && timeExpired(value)) { // 假设多个线程(非单jvm)同时走到这里 Serializable oldValue = getAndSet(lockKey, lockExpireTime);// 获取并设置新值,此操作是原子性的 // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的) // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了 if (oldValue != null && timeExpired(oldValue)) { return true; } } } return false; } public boolean tryLock(Serializable lockKey, Long expireTimeOut, TimeUnit expireTimeUnit) { long lockExpireTime = System.currentTimeMillis() + expireTimeUnit.toMillis(expireTimeOut) + 1;// 锁超时时间 if (setNotExist(lockKey, lockExpireTime)) { // 获取到锁 return true; } if (timeExpired(get(lockKey))) { // lock is expired // 假设多个线程(非单jvm)同时走到这里 Serializable oldValue = getAndSet(lockKey, lockExpireTime); //getAndSet是原子性的 // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getAndSet是原子性的) // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了 if (oldValue != null && timeExpired(oldValue)) { return true; } } return false; } public void unlock(Serializable lockKey) { delete(lockKey); } private boolean timeExpired(Serializable value) { return Long.parseLong(String.valueOf(value)) < System.currentTimeMillis(); } private boolean isTimeout(long start, long timeout) { return start + timeout > System.currentTimeMillis(); } public void setRedisTemplate(RedisTemplate<Serializable, Serializable> redisTemplate) { this.redisTemplate = redisTemplate; } public void setRegionName(String regionName) { this.regionName = regionName; } public void setExpire(Long expire) { this.expire = expire; } } |