帶你研究Redis分散式鎖,原始碼走起

前言

前陣子我們講了分散式鎖的實現方式之一:zookeeper,那麼這次我們來講講同樣流行,甚至更勝一籌的Redis。

除了這兩種其實還有資料庫實現分散式鎖啊,但是這種方式是非主流,所以咱這裡就不講了,要講咱就講主流的。

分散式鎖幾大特性

互斥性。在任意時刻,只有一個客戶端能持有鎖,也叫唯一性。

不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其他客戶端能加鎖。

解鈴還須繫鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了,即不能誤解鎖。

鎖不能自己失效。正常執行程式過程中,鎖不能因為某些原因失效。

具有容錯性。只要大多數Redis節點正常執行,客戶端就能夠獲取和釋放鎖。

下面我們舉一些實現方式,逐步理解這幾大特性。

第一種實現方式(初級)

publicvoidwrongRedisLock(Jedis jedis, String lockKey, int expireTime){

// 過期時間

long expires = System。currentTimeMillis() + expireTime;

String expiresStr = String。valueOf(expires);

if (jedis。setnx(lockKey, expiresStr) == 1) {

// 開始執行程式碼邏輯

}

}

互斥性

首先這裡使用的是setnx這個命令,這個命令的特點就是,如果要設定的key不存在,那麼我就可以設定成功。如果key存在,我就設定失敗。

這樣的特點會保證Redis裡只有一個唯一的key,一群客戶端同時去設定key時,也只有一個人能設定成功。

因為這個特性,他保證了第一個特性:互斥性。

不會發生死鎖

他這裡設定了過期時間,即使客戶端宕機的時候,鎖也會自動被釋放,因為過期時間一到,key就會被自動刪除了。

因為這個特性,他保證了第二個特性:不會發生死鎖。

除了以上兩個特性滿足外,其他三個特性都沒有滿足。

第二種實現方式(中級)

加鎖實現

/**

* 獲取分散式鎖(加鎖程式碼)

* @param jedis Redis客戶端

* @param lockKey 鎖

* @param requestId 請求標識

* @param expireTime 超期時間

* @return 是否獲取成功

*/

publicstaticbooleangetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime){

String result = jedis。set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

if (LOCK_SUCCESS。equals(result)) {

returntrue;

}

returnfalse;

}

解鈴還須繫鈴人

這個加鎖與第一種不同之處在於:設定了value值。(value要是唯一能代表客戶端的標識)

這個value代表是哪個客戶端加的鎖,當解鎖的時候就需要對比value,看是不是這個客戶端加的鎖。如果是才能解鎖成功,否則解鎖失敗。

他保證了第三個特性:解鈴還須繫鈴人。

解鎖實現

/**

* 釋放分散式鎖(解鎖程式碼)

* @param jedis Redis客戶端

* @param lockKey 鎖

* @param requestId 請求標識

* @return 是否釋放成功

*/

publicstaticbooleanreleaseDistributedLock(Jedis jedis, String lockKey, String requestId){

String script = “if ” +

“redis。call(‘get’, KEYS[1]) == ARGV[1]” +

“then”+

“return redis。call(‘del’, KEYS[1])” +

“else” +

“return 0 end”;

Object result = jedis。eval(script, Collections。singletonList(lockKey), Collections。singletonList(requestId));

if (RELEASE_SUCCESS。equals(result)) {

returntrue;

}

returnfalse;

}

這裡使用的是Lua指令碼,為什麼要使用這個指令碼呢?

大家看上面的解鎖操作,正常情況下,if/else的解鎖操作不是原子性的, 存在併發安全問題。

那麼在Redis裡執行Lua指令碼,能保證這些操作是原子性的,不存在併發安全問題,這就是Lua指令碼的作用。

帶大家解讀以上Lua指令碼的意思。

redis。call是呼叫Redis的API方法,這裡是呼叫的get和delete方法,key是KEYS[1]這個引數,它相當於一個佔位符表示式,真實賦值是方法外傳進來的lockKey,下面的ARGV[1]也是同理。

整個Lua指令碼連起來的意思就是:如果透過lockKey獲取到的value值等於方法外傳進來的值requestId,那麼就刪除掉lockKey,否則返回0。

這個第二種方式,保證了互斥性、不會發生死鎖、解鈴還須繫鈴人,可以說滿足了大部分場景的需求,那麼第四點和第五點還是沒有滿足,我們下面繼續來看。

第三種實現方式(高階)

Redisson

Redisson是一個在Redis的基礎上實現的Java駐記憶體資料網格(In-Memory Data Grid)。

它不僅提供了一系列的分散式的Java常用物件,還實現了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分散式服務。

Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。

用法舉例

publicvoidtestReentrantLock(RedissonClient redisson){

RLock lock = redisson。getLock(“anyLock”);

try{

// 1。 最常見的使用方法

//lock。lock();

// 2。 支援過期解鎖功能,10秒鐘以後自動解鎖, 無需呼叫unlock方法手動解鎖

//lock。lock(10, TimeUnit。SECONDS);

// 3。 嘗試加鎖,最多等待3秒,上鎖以後10秒自動解鎖

boolean res = lock。tryLock(3, 10, TimeUnit。SECONDS);

if(res){ //成功

// do your business

}

} catch (InterruptedException e) {

e。printStackTrace();

} finally {

lock。unlock();

}

}

大家可以看到和我們的ReentrantLock用法上類似,我們來讀讀他的原始碼吧。

重點方法羅列

publicclassRedissonLock{

//————————————Lock介面方法————————————-

/**

* 加鎖 鎖的有效期預設30秒

*/

voidlock();

/**

* tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他執行緒獲取),則返回false。

*/

booleantryLock();

/**

* tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區別在於這個方法在拿不到鎖時會等待一定的時間,

* 在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。

*

* @param time 等待時間

* @param unit 時間單位 小時、分、秒、毫秒等

*/

booleantryLock(long time, TimeUnit unit)throws InterruptedException;

/**

* 解鎖

*/

voidunlock();

/**

* 中斷鎖 表示該鎖可以被中斷 假如A和B同時調這個方法,A獲取鎖,B為獲取鎖,那麼B執行緒可以透過

* Thread。currentThread()。interrupt(); 方法真正中斷該執行緒

*/

voidlockInterruptibly();

/**

* 加鎖 上面是預設30秒這裡可以手動設定鎖的有效時間

*

* @param leaseTime 鎖有效時間

* @param unit 時間單位 小時、分、秒、毫秒等

*/

voidlock(long leaseTime, TimeUnit unit);

/**

* 這裡比上面多一個引數,多新增一個鎖的有效時間

*

* @param waitTime 等待時間

* @param leaseTime 鎖有效時間

* @param unit 時間單位 小時、分、秒、毫秒等

*/

booleantryLock(long waitTime, long leaseTime, TimeUnit unit)throws InterruptedException;

/**

* 檢驗該鎖是否被執行緒使用,如果被使用返回True

*/

booleanisLocked();

/**

* 檢查當前執行緒是否獲得此鎖(這個和上面的區別就是該方法可以判斷是否當前執行緒獲得此鎖,而不是此鎖是否被執行緒佔有)

* 這個比上面那個實用

*/

booleanisHeldByCurrentThread();

/**

* 中斷鎖 和上面中斷鎖差不多,只是這裡如果獲得鎖成功,新增鎖的有效時間

* @param leaseTime 鎖有效時間

* @param unit 時間單位 小時、分、秒、毫秒等

*/

voidlockInterruptibly(long leaseTime, TimeUnit unit);

}

下面我們講其中一種加鎖方式:tryLock,其餘的大家可以自己看看,原理都差不多。

tryLock加鎖原始碼解讀

大家先看看加鎖流程圖

帶你研究Redis分散式鎖,原始碼走起

整個程式碼主流程

程式碼都做了註釋,大家可以跟著註釋閱讀原始碼

@Override

publicbooleantryLock(long waitTime, long leaseTime, TimeUnit unit)throws InterruptedException {

//取得最大等待時間

long time = unit。toMillis(waitTime);

//記錄下當前時間

long current = System。currentTimeMillis();

//取得當前執行緒id(判斷是否可重入鎖的關鍵)

long threadId = Thread。currentThread()。getId();

//1。嘗試申請鎖,返回還剩餘的鎖過期時間

Long ttl = tryAcquire(leaseTime, unit, threadId);

//2。如果為空,表示申請鎖成功

if (ttl == null) {

returntrue;

}

//3。申請鎖的耗時如果大於等於最大等待時間,則申請鎖失敗

time -= System。currentTimeMillis() - current;

if (time <= 0) {

/**

* 透過 promise。trySuccess 設定非同步執行的結果為null

* Promise從Uncompleted——>Completed ,通知 Future 非同步執行已完成

*/

acquireFailed(threadId);

returnfalse;

}

current = System。currentTimeMillis();

/**

* 4。訂閱鎖釋放事件,並透過await方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題:

* 基於資訊量,當鎖被其它資源佔用時,當前執行緒透過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發訊息通知待等待的執行緒進行競爭

* 當 this。await返回false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱並返回獲取鎖失敗

* 當 this。await返回true,進入迴圈嘗試獲取鎖

*/

RFuture subscribeFuture = subscribe(threadId);

//await 方法內部是用CountDownLatch來實現阻塞,獲取subscribe非同步執行的結果(應用了Netty 的 Future)

if (!await(subscribeFuture, time, TimeUnit。MILLISECONDS)) {

if (!subscribeFuture。cancel(false)) {

subscribeFuture。onComplete((res, e) -> {

if (e == null) {

unsubscribe(subscribeFuture, threadId);

}

});

}

acquireFailed(threadId);

returnfalse;

}

try {

//計算獲取鎖的總耗時,如果大於等於最大等待時間,則獲取鎖失敗

time -= System。currentTimeMillis() - current;

if (time <= 0) {

acquireFailed(threadId);

returnfalse;

}

/**

* 5。收到鎖釋放的訊號後,在最大等待時間之內,迴圈一次接著一次的嘗試獲取鎖

* 獲取鎖成功,則立馬返回true,

* 若在最大等待時間之內還沒獲取到鎖,則認為獲取鎖失敗,返回false結束迴圈

*/

while (true) {

long currentTime = System。currentTimeMillis();

// 再次嘗試申請鎖

ttl = tryAcquire(leaseTime, unit, threadId);

// 成功獲取鎖則直接返回true結束迴圈

if (ttl == null) {

returntrue;

}

//超過最大等待時間則返回false結束迴圈,獲取鎖失敗

time -= System。currentTimeMillis() - currentTime;

if (time <= 0) {

acquireFailed(threadId);

returnfalse;

}

/**

* 6。阻塞等待鎖(透過訊號量(共享鎖)阻塞,等待解鎖訊息):

*/

currentTime = System。currentTimeMillis();

if (ttl >= 0 && ttl < time) {

//如果剩餘時間(ttl)小於wait time ,就在 ttl 時間內,從Entry的訊號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。

getEntry(threadId)。getLatch()。tryAcquire(ttl, TimeUnit。MILLISECONDS);

} else {

//則就在wait time 時間範圍內等待可以透過訊號量

getEntry(threadId)。getLatch()。tryAcquire(time, TimeUnit。MILLISECONDS);

}

//7。更新剩餘的等待時間(最大等待時間-已經消耗的阻塞時間)

time -= System。currentTimeMillis() - currentTime;

if (time <= 0) {

acquireFailed(threadId);

returnfalse;

}

}

} finally {

//7。無論是否獲得鎖,都要取消訂閱解鎖訊息

unsubscribe(subscribeFuture, threadId);

}

}

核心加鎖程式碼

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId){

return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));

}

private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId){

//設定了鎖持有時間

if (leaseTime != -1) {

return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands。EVAL_LONG);

}

//未設定鎖持有時間,使用看門狗的預設的30秒

RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor。getConnectionManager()。getCfg()。getLockWatchdogTimeout(), TimeUnit。MILLISECONDS, threadId, RedisCommands。EVAL_LONG);

// 非同步獲取結果,如果獲取鎖成功,則啟動定時執行緒進行鎖續約

ttlRemainingFuture。onComplete((ttlRemaining, e) -> {

if (e != null) {

return;

}

// 啟動WatchDog

if (ttlRemaining == null) {

scheduleExpirationRenewal(threadId);

}

});

return ttlRemainingFuture;

}

RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command){

internalLockLeaseTime = unit。toMillis(leaseTime);

/**

* 透過 EVAL 命令執行 Lua 指令碼獲取鎖,保證了原子性

*/

return commandExecutor。evalWriteAsync(getName(), LongCodec。INSTANCE, command,

// 1。如果快取中的key不存在,則執行 hset 命令(hset key UUID+threadId 1),然後透過 pexpire 命令設定鎖的過期時間(即鎖的租約時間)

// 返回空值 nil ,表示獲取鎖成功

“if (redis。call(‘exists’, KEYS[1]) == 0) then ” +

“redis。call(‘hset’, KEYS[1], ARGV[2], 1); ” +

“redis。call(‘pexpire’, KEYS[1], ARGV[1]); ” +

“return nil; ” +

“end; ” +

// 如果key已經存在,並且value也匹配,表示是當前執行緒持有的鎖,則執行 hincrby 命令,重入次數加1,並且設定失效時間

“if (redis。call(‘hexists’, KEYS[1], ARGV[2]) == 1) then ” +

“redis。call(‘hincrby’, KEYS[1], ARGV[2], 1); ” +

“redis。call(‘pexpire’, KEYS[1], ARGV[1]); ” +

“return nil; ” +

“end; ” +

//如果key已經存在,但是value不匹配,說明鎖已經被其他執行緒持有,透過 pttl 命令獲取鎖的剩餘存活時間並返回,至此獲取鎖失敗

“return redis。call(‘pttl’, KEYS[1]);”,

//這三個引數分別對應KEYS[1],ARGV[1]和ARGV[2]

Collections。singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

}

引數說明:

KEYS[1]就是Collections。singletonList(getName()),表示分散式鎖的key;

ARGV[1]就是internalLockLeaseTime,即鎖的租約時間(持有鎖的有效時間),預設30s;

ARGV[2]就是getLockName(threadId),是獲取鎖時set的唯一值 value,即UUID+threadId。

大家注意到看門狗那個功能沒?scheduleExpirationRenewal(threadId);這個方法的使命就是給鎖續命。

簡單來說就是一個定時任務,定時去判斷鎖還有多久失效,如果快失效了,就把鎖的失效時間延長。

這裡就實現了我們之前所說的第四點:

鎖不能自己失效

tryLock解鎖原始碼解讀

大家先看看加鎖流程圖

帶你研究Redis分散式鎖,原始碼走起

原始碼

呼叫關係:unlock —> unlockAsync —> unlockInnerAsync,unlockInnerAsync是解鎖的核心程式碼

@Override

publicvoidunlock(){

try {

get(unlockAsync(Thread。currentThread()。getId()));

} catch (RedisException e) {

if (e。getCause() instanceof IllegalMonitorStateException) {

throw (IllegalMonitorStateException) e。getCause();

} else {

throw e;

}

}

}

@Override

public RFuture unlockAsync(long threadId){

RPromise result = new RedissonPromise();

RFuture future = unlockInnerAsync(threadId);

future。onComplete((opStatus, e) -> {

cancelExpirationRenewal(threadId);

if (e != null) {

result。tryFailure(e);

return;

}

if (opStatus == null) {

IllegalMonitorStateException cause = new IllegalMonitorStateException(“attempt to unlock lock, not locked by current thread by node id: ”

+ id + “ thread-id: ” + threadId);

result。tryFailure(cause);

return;

}

result。trySuccess(null);

});

return result;

}

// 核心解鎖程式碼

protected RFuture unlockInnerAsync(long threadId){

/**

* 透過 EVAL 命令執行 Lua 指令碼獲取鎖,保證了原子性

*/

return commandExecutor。evalWriteAsync(getName(), LongCodec。INSTANCE, RedisCommands。EVAL_BOOLEAN,

//如果分散式鎖存在,但是value不匹配,表示鎖已經被其他執行緒佔用,無權釋放鎖,那麼直接返回空值(解鈴還須繫鈴人)

“if (redis。call(‘hexists’, KEYS[1], ARGV[3]) == 0) then ” +

“return nil;” +

“end; ” +

//如果value匹配,則就是當前執行緒佔有分散式鎖,那麼將重入次數減1

“local counter = redis。call(‘hincrby’, KEYS[1], ARGV[3], -1); ” +

//重入次數減1後的值如果大於0,表示分散式鎖有重入過,那麼只能更新失效時間,還不能刪除

“if (counter > 0) then ” +

“redis。call(‘pexpire’, KEYS[1], ARGV[2]); ” +

“return 0; ” +

“else ” +

//重入次數減1後的值如果為0,這時就可以刪除這個KEY,併發布解鎖訊息,返回1

“redis。call(‘del’, KEYS[1]); ” +

“redis。call(‘publish’, KEYS[2], ARGV[1]); ” +

“return 1; ”+

“end; ” +

“return nil;”,

//這5個引數分別對應KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]

Arrays。asList(getName(), getChannelName()), LockPubSub。UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}

解鎖訊息通知:

之前加鎖的時候原始碼裡寫過,如果沒獲取鎖成功,就監聽這個鎖,監聽它什麼時候釋放,所以解鎖的時候,要發出這個訊息通知,讓其他想獲取鎖的客戶端知道。

publicclassLockPubSubextendsPublishSubscribe {

publicstaticfinal Long UNLOCK_MESSAGE = 0L;

publicstaticfinal Long READ_UNLOCK_MESSAGE = 1L;

publicLockPubSub(PublishSubscribeService service){

super(service);

}

@Override

protected RedissonLockEntry createEntry(RPromise newPromise){

returnnew RedissonLockEntry(newPromise);

}

@Override

protectedvoidonMessage(RedissonLockEntry value, Long message){

/**

* 判斷是否是解鎖訊息

*/

if (message。equals(UNLOCK_MESSAGE)) {

Runnable runnableToExecute = value。getListeners()。poll();

if (runnableToExecute != null) {

runnableToExecute。run();

}

/**

* 釋放一個訊號量,喚醒等待的entry。getLatch()。tryAcquire去再次嘗試申請鎖

*/

value。getLatch()。release();

} elseif (message。equals(READ_UNLOCK_MESSAGE)) {

while (true) {

/**

* 如果還有其他Listeners回撥,則也喚醒執行

*/

Runnable runnableToExecute = value。getListeners()。poll();

if (runnableToExecute == null) {

break;

}

runnableToExecute。run();

}

value。getLatch()。release(value。getLatch()。getQueueLength());

}

}

}

到這裡,Redis官方實現的分散式鎖原始碼就講完了,但是有個問題,它雖然實現了鎖不能自己失效這個特性,但是容錯性方面還是沒有實現。

容錯性場景舉例

因為在工作中Redis都是叢集部署的,所以要考慮叢集節點掛掉的問題。給大家舉個例子:

1、A客戶端請求主節點獲取到了鎖

2、主節點掛掉了,但是還沒把鎖的資訊同步給其他從節點

3、由於主節點掛了,這時候開始主從切換,從節點成為主節點繼續工作,但是新的主節點上,沒有A客戶端的加鎖資訊

4、這時候B客戶端來加鎖,因為目前是一個新的主節點,上面沒有其他客戶端加鎖資訊,所以B客戶端獲取鎖成功

5、這時候就存在問題了,A和B兩個客戶端同時都持有鎖,同時在執行程式碼,那麼這時候分散式鎖就失效了。

這裡大家會有疑問了,為啥官方給出一個分散式鎖的實現,卻不解決這個問題呢,因為發生這種情況的機率不大,而且解決這個問題的成本有點小高。

所以,如果業務場景可以容忍這種小機率的錯誤,則推薦使用 RedissonLock,如果無法容忍,老哥這裡給忍不了的同學留個思考題。

RedissonRedLock

,這個中文名字叫紅鎖,它可以解決這個叢集容錯性的問題,這裡把它當做思考題留給大家。別偷懶,下去認真學。

部分源於:Gopher_39b2

  • 顶部