java多執行緒-顯式鎖和AQS

顯式鎖和AQS

CAS

原子類

顯式鎖

AQS

CAS

CAS

是用於實現多執行緒同步的原子指令。 它將記憶體位置的內容與給定值進行比較,只有在相同的情況下,將該記憶體位置的內容修改為新的給定值。 這是作為單個原子操作完成的。 原子性保證新值基於最新資訊計算; 如果該值在同一時間被另一個執行緒更新,則寫入將失敗。 操作結果必須說明是否進行替換; 這可以透過一個簡單的布林響應(這個變體通常稱為比較和設定),或透過返回從記憶體位置讀取的值來完成。 JAVA1。5開始引入了CAS,主要程式碼都放在JUC的atomic包下。是同步鎖的一種樂觀鎖。

CAS 操作包含三個運算元 —— 記憶體位置(V)、預期原值(A)和新值(B)。-> 認為位置 V 應該包含值 A;如果包含該值,則將 B 放到這個位置;否則,不要更改該位置,返回這個位置現在的值即可。“

現代的CPU提供了特殊的指令,可以自動更新共享資料,而且能夠檢測到其他執行緒的干擾,而 compareAndSet() 就用這些代替了鎖定。

下面是

AtomicInteger

使用cas的方式。可以發現是

cas

+ 死迴圈來處理獲取值。這裡的死迴圈也被稱為自旋

// 在沒有鎖的機制下可能需要藉助volatile關鍵字保證資料的可見性private volatile int value; /** 自增採用了CAS操作,每次從記憶體中讀取資料然後將此資料和+1後的結果進行CAS操作,如果成功就返回結果,否則重試直到成功為止。compareAndSet利用JNI來完成CPU指令的操作整體的過程就是這樣子的,利用CPU的CAS指令,同時藉助JNI來完成Java的非阻塞演算法。其它原子操作都是利用類似的特性完成的。*/public final int incrementAndGet() { // 自旋 for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return next; }}// CAS操作public final boolean compareAndSet(int expect, int update) { return unsafe。compareAndSwapInt(this, valueOffset, expect, update);}

CAS帶來的問題:

ABA問題

資料原來值為A,執行緒1將值變化為B,執行緒2將值變化為A。執行緒3檢視發現還是A,

迴圈時間長開銷大

CAS通常配合自旋來實現變數的執行緒安全,那麼當CAS一直獲取不到鎖的時候,會導致自旋時間過長,浪費資源

只能保證一個共享變數的原子操作

原子類

這裡就寫幾個簡單的程式展示下使用,具體還是要在工作中用到哪些在練習

AtomicInteger

package atomic。integer;import java。util。concurrent。atomic。AtomicInteger;public class AtomicIntDemo { static AtomicInteger i = new AtomicInteger(10); public static void main(String[] args) { //10——->11 System。out。println(i。getAndIncrement()); //11——->12——->out System。out。println(i。incrementAndGet()); System。out。println(i。get()); }}

AtomicBoolean

package atomic。booleanA;import java。util。concurrent。TimeUnit;import java。util。concurrent。atomic。AtomicBoolean;public class AtomicBooleanDemo implements Runnable { /** * 原子操作類 * static 必須有,因為exists是作為共享變數來使用的, * 如果去掉static,那麼在main方法中新建的 AtomicBooleanDemo 物件就是倆個exists就無法做到原子類的控制作用 */ private static AtomicBoolean exists = new AtomicBoolean(false); private String name; public AtomicBooleanDemo(String name) { this。name = name; } @Override public void run() { if (exists。compareAndSet(false, true)) { System。out。println(name + “ enter”); try { System。out。println(name + “ working”); TimeUnit。SECONDS。sleep(2); } catch (InterruptedException e) { // do nothing } System。out。println(name + “ leave”); exists。set(false); } else { System。out。println(name + “ give up”); } } public static void main(String[] args) { AtomicBooleanDemo bar1 = new AtomicBooleanDemo(“bar1”); AtomicBooleanDemo bar2 = new AtomicBooleanDemo(“bar2”); new Thread(bar1)。start(); new Thread(bar2)。start(); }}

AtomicIntegerArray

package atomic。array;import java。util。concurrent。atomic。AtomicIntegerArray;public class AtomicArrayDemo { static int[] value = new int[] { 1, 2 }; static AtomicIntegerArray array = new AtomicIntegerArray(value); public static void main(String[] args) { array。getAndSet(0, 3); System。out。println(array。get(0)); System。out。println(value[0]); }}

AtomicReference

這裡說明下,這個相當於更換了

AtomicReference

裡面的例項,不會對原資料修改

package atomic。reference;import java。util。concurrent。atomic。AtomicReference;public class AtomicReferenceDemo { static AtomicReference userRef = new AtomicReference(); public static void main(String[] args) { UserInfo user = new UserInfo(“jimmy”, 15);//要修改的實體的例項 userRef。set(user); UserInfo updateUser = new UserInfo(“jimmaly”, 17);//要變化的新例項 userRef。compareAndSet(user, updateUser); System。out。println(userRef。get()。getName()); System。out。println(userRef。get()。getAge()); System。out。println(user。getName()); System。out。println(user。getAge()); } //定義一個實體類 static class UserInfo { private String name; private int age; public UserInfo(String name, int age) { this。name = name; this。age = age; } public String getName() { return name; } public int getAge() { return age; } }}

AtomicStampedReference 帶版本戳的原子操作類

package atomic。reference;import java。util。concurrent。atomic。AtomicStampedReference;public class AtomicStampedReferenceDemo { static AtomicStampedReference asr = new AtomicStampedReference<>(“jimmy”,0); public static void main(String[] args) throws InterruptedException { final int oldStamp = asr。getStamp();//那初始的版本號 final String oldReferenc = asr。getReference(); System。out。println(oldReferenc+“===========”+oldStamp); Thread rightStampThread = new Thread(new Runnable() { @Override public void run() { System。out。println(Thread。currentThread()。getName() +“當前變數值:”+oldReferenc+“當前版本戳:”+oldStamp+“-” +asr。compareAndSet(oldReferenc, oldReferenc+“Java”, oldStamp, oldStamp+1)); } }); Thread errorStampThread = new Thread(new Runnable() { @Override public void run() { String reference = asr。getReference(); System。out。println(Thread。currentThread()。getName() +“當前變數值:”+reference+“當前版本戳:”+asr。getStamp()+“-” +asr。compareAndSet(reference, reference+“C”, oldStamp, oldStamp+1)); } }); rightStampThread。start(); rightStampThread。join(); errorStampThread。start(); errorStampThread。join(); System。out。println(asr。getReference()+“===========”+asr。getStamp()); }}

顯式鎖

Lock介面

看下這個介面有哪些方法

void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();void unlock();Condition newCondition();

先看一個demo

/** * @Author: jimmy * @Date: 2021/4/18 09:08 * @Description: */public class MyThread extends Thread{ private Test test = null; public MyThread(Test test, String name) { super(name); this。test = test; } @Override public void run() { test。insert(Thread。currentThread()); System。out。println(“執行緒 ” + Thread。currentThread()。getName() + “被中斷。。。”); }}package lock。lockexception;import java。util。concurrent。locks。Lock;import java。util。concurrent。locks。ReentrantLock;/** * @Author: jimmy * @Date: 2021/4/18 09:08 * @Description: */public class Test { private Lock lock = new ReentrantLock(); public static void main(String[] args) { Test test = new Test(); MyThread thread1 = new MyThread(test, “A”); MyThread thread2 = new MyThread(test, “B”); thread1。start(); thread2。start(); try { Thread。sleep(5000); System。out。println(“執行緒” + Thread。currentThread()。getName() + “ 睡醒了。。。”); } catch (InterruptedException e) { e。printStackTrace(); } thread2。interrupt(); } public void insert(Thread thread) { try { // 注意,如果將獲取鎖放在try語句塊裡,則必定會執行finally語句塊中的解鎖操作。若執行緒在獲取鎖時被中斷,則再執行解鎖操作就會導致異常,因為該執行緒並未獲得到鎖。 lock。lockInterruptibly(); System。out。println(“執行緒 ” + thread。getName() + “得到了鎖。。。”); long startTime = System。currentTimeMillis(); for (;;) { if (System。currentTimeMillis() - startTime >= Integer。MAX_VALUE) // 耗時操作 break; // 插入資料 } } catch (Exception e) { System。out。println(“————————————————”); } finally { System。out。println(Thread。currentThread()。getName() + “執行finally。。。”); lock。unlock(); System。out。println(“執行緒 ” + thread。getName() + “釋放了鎖。。。”); } }}

Lock介面和synchronized的比較:

synchronized 程式碼簡潔,

Lock:獲取鎖可以被中斷,超時獲取鎖,嘗試獲取鎖,讀多寫少用讀寫鎖

這裡還有幾個概念。

可重入鎖某個執行緒已經獲得某個鎖,可以再次獲取鎖而不會出現死鎖,

synchronized

ReentrantLock

都屬於可重入鎖。

使用

ReentrantLock

的時候一定要

手動釋放鎖

,並且

加鎖次數和釋放次數要一樣

公平鎖、非公平鎖在時間上,先對鎖進行獲取的請求,一定先被滿足,這個鎖就是公平的,不滿足,就是非公平的。非公平的效率一般來講更高。因為執行緒的上下文切換比較耗時,而非公平鎖可能會不存在切換上下文的情況。

ReadWriteLock& ReentrantReadWriteLock

ReadWriteLock

介面和讀寫鎖

ReentrantReadWriteLock

。不廢話直接上程式碼:

//——————- test ————————-///** * @Author: jimmy * @Date: 2021/4/18 10:17 * @Description: */public class RwLockTest { // 讀寫執行緒的比例 static final int readWriteRatio = 10; // 執行緒數 static final int threadCount = 3; private static class Reader implements Runnable{ private RwLockDemo rwLockDemo; public Reader(RwLockDemo rwLockDemo) { this。rwLockDemo = rwLockDemo; } @Override public void run() { long start = System。currentTimeMillis(); for(int i=0;i<100;i++){//操作100次 try { rwLockDemo。getNum(); } catch (InterruptedException e) { System。out。println(“Reader ex。。。。”); } } System。out。println(Thread。currentThread()。getName()+“讀取商品資料耗時:” +(System。currentTimeMillis()-start)+“ms”); } } private static class Writter implements Runnable{ private RwLockDemo rwLockDemo; public Writter(RwLockDemo rwLockDemo) { this。rwLockDemo = rwLockDemo; } @Override public void run() { long start = System。currentTimeMillis(); Random r = new Random(); for(int i=0; i<10; i++){//操作10次 try { Thread。sleep(50); rwLockDemo。setNum(r。nextInt(10)); } catch (Exception e) { System。out。println(“Writter ex。。。。”); } } System。out。println(Thread。currentThread()。getName() +“寫商品資料耗時:”+(System。currentTimeMillis()-start)+“ms————-”); } } // Test public static void main(String[] args) throws InterruptedException { GoodsInfo goodsInfo = new GoodsInfo(“Cup”,100000,10000); RwLockDemo rwLockDemo = new RwLockDemo(goodsInfo); for(int i = 0; i

Tip:讀寫鎖同一時刻允許多個讀執行緒同時訪問,但是寫執行緒訪問的時候,所有的讀和寫都被阻塞,最適宜與

讀多寫少

的情況

Condition

condition介面用來配合lock使用,表示條件,可以實現等待通知功能

public interface Lock { Condition newCondition();}public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll();}

下面是一個簡單的demo

public class ExpressCond { public final static String CITY = “HarBin”; /*快遞運輸里程數*/ private int km; /*快遞到達地點*/ private String site; private Lock lock = new ReentrantLock(); private Condition keCond = lock。newCondition(); private Condition siteCond = lock。newCondition(); public ExpressCond() { } public ExpressCond(int km, String site) { this。km = km; this。site = site; } /** * 變化公里數,然後通知處於wait狀態並需要處理公里數的執行緒進行業務處理 */ public void changeKm(){ lock。lock(); try { this。km = 101; keCond。signalAll(); }finally { lock。unlock(); } } /** * 變化地點,然後通知處於wait狀態並需要處理地點的執行緒進行業務處理 */ public void changeSite(){ lock。lock(); try { this。site = “BeiJing”; siteCond。signal(); }finally { lock。unlock(); } } /** * 當快遞的里程數大於100時更新資料庫 */ public void waitKm(){ lock。lock(); try { while(this。km<=100) { try { keCond。await(); System。out。println(“check km thread[”+Thread。currentThread()。getId() +“] is be notifed。”); } catch (InterruptedException e) { // TODO Auto-generated catch block e。printStackTrace(); } } }finally { lock。unlock(); } System。out。println(“the Km is ”+this。km+“,I will change db”); } /** * 當快遞到達目的地時通知使用者 */ public void waitSite(){ lock。lock(); try { while(CITY。equals(this。site)) { try { siteCond。await(); System。out。println(“check site thread[”+Thread。currentThread()。getId() +“] is be notifed。”); } catch (InterruptedException e) { // TODO Auto-generated catch block e。printStackTrace(); } } }finally { lock。unlock(); } System。out。println(“the site is ”+this。site+“,I will call user”); }}public class TestCond { private static ExpressCond express = new ExpressCond(0,ExpressCond。CITY); /*檢查里程數變化的執行緒,不滿足條件,執行緒一直等待*/ private static class CheckKm extends Thread{ @Override public void run() { express。waitKm(); } } /*檢查地點變化的執行緒,不滿足條件,執行緒一直等待*/ private static class CheckSite extends Thread{ @Override public void run() { express。waitSite(); } } public static void main(String[] args) throws InterruptedException { for(int i=0;i<3;i++){ new CheckSite()。start(); } for(int i=0;i<3;i++){ new CheckKm()。start(); } Thread。sleep(1000); //快遞里程變化 express。changeKm(); }}

AQS

AbstractQueuedSynchronizer抽象的佇列式的同步器

AQS

設計思路是

CAS

操作 +

volatile

關鍵字 + 改造的

CLH佇列

採用的是模板方法的設計模式。

獨佔模式獲取

accquire

acquireInterruptibly

tryAcquireNanos

共享式獲取

acquireShared

acquireSharedInterruptibly

tryAcquireSharedNanos

獨佔式釋放鎖

release

共享式釋放鎖

releaseShared

需要子類覆蓋的流程方法

獨佔式獲取 tryAcquire

獨佔式釋放 tryRelease

共享式獲取 tryAcquireShared

共享式釋放 tryReleaseShared

這個同步器是否處於獨佔模式 isHeldExclusively

下面稍微對原始碼進行解讀下,更加詳細的可以檢視 java多執行緒進階學習1的

ASQ

部分

// 狀態值 voatileprivate volatile int state;// CLH 佇列static final class Node {}// 獲取鎖、釋放鎖public final void acquire(int arg)public final void acquireInterruptibly(int arg)public final boolean release(int arg) 。。。// CAS private final boolean compareAndSetHead(Node update) protected final boolean compareAndSetState(int expect, int update) 。。。

AQS鎖的佔用和釋放

這裡直接使用事前總結的內容:

獲取獨佔鎖流程:

入口方法acquire(arg)

tryAcquire(arg)嘗試獲取鎖,若成功則返回,若失敗則走下一步

將當前執行緒構造成一個Node節點,並利用CAS將其加入到同步佇列到尾部,然後該節點對應到執行緒進入自旋狀態自旋時,首先判斷其前驅節點是否為頭節點&是否成功獲取同步狀態,兩個條件都成立,則將當前執行緒的節點設定為頭節點,如果不是,則利用**

LockSupport。park(this)

**將當前執行緒掛起 ,等待被前驅節點喚醒

釋放獨佔鎖流程:

release(arg)

tryRelease(arg)釋放同步狀態

獲取當前節點的下一個節點,利用LockSupport。unpark(currentNode。next。thread)喚醒後繼節點

獲取共享鎖流程

acquireShared(arg)入口方法

tryAcquireShared(arg)模版方法獲取同步狀態,如果返返回值>=0,則說明同步狀態(state)有剩餘,獲取鎖成功直接返回

如果tryAcquireShared(arg)返回值<0,說明獲取同步狀態失敗,向佇列尾部新增一個共享型別的Node節點,隨即該節點進入自旋狀態

自旋時,首先檢查前驅節點釋放為頭節點&tryAcquireShared()是否>=0(即成功獲取同步狀態)

如果是,則說明當前節點可執行,同時把當前節點設定為頭節點,並且喚醒所有後繼節點。如果否,則利用LockSupport。unpark(this)掛起當前執行緒,等待被前驅節點喚醒

釋放共享鎖流程

releaseShared(arg)模版方法釋放同步狀態。如果釋放成,則遍歷整個佇列,利用LockSupport。unpark(nextNode。thread)喚醒所有後繼節點

獨佔鎖和共享鎖在實現上的區別

獨佔鎖的同步狀態值為1,即同一時刻只能有一個執行緒成功獲取同步狀態

共享鎖的同步狀態>1,取值由上層同步元件確定

獨佔鎖佇列中頭節點執行完成後釋放它的直接後繼節點

共享鎖佇列中頭節點執行完成後釋放它後面的所有節點

共享鎖中會出現多個執行緒(即同步佇列中的節點)同時成功獲取同步狀態的情況