java多執行緒-顯式鎖和AQS
顯式鎖和AQS
CAS
原子類
顯式鎖
AQS
CAS
CAS
是用於實現多執行緒同步的原子指令。 它將記憶體位置的內容與給定值進行比較,只有在相同的情況下,將該記憶體位置的內容修改為新的給定值。 這是作為單個原子操作完成的。 原子性保證新值基於最新資訊計算; 如果該值在同一時間被另一個執行緒更新,則寫入將失敗。 操作結果必須說明是否進行替換; 這可以透過一個簡單的布林響應(這個變體通常稱為比較和設定),或透過返回從記憶體位置讀取的值來完成。 JAVA1。5開始引入了CAS,主要程式碼都放在JUC的atomic包下。是同步鎖的一種樂觀鎖。
CAS 操作包含三個運算元 —— 記憶體位置(V)、預期原值(A)和新值(B)。-> 認為位置 V 應該包含值 A;如果包含該值,則將 B 放到這個位置;否則,不要更改該位置,返回這個位置現在的值即可。“
現代的CPU提供了特殊的指令,可以自動更新共享資料,而且能夠檢測到其他執行緒的干擾,而 compareAndSet() 就用這些代替了鎖定。
下面是
AtomicInteger
使用cas的方式。可以發現是
cas
+ 死迴圈來處理獲取值。這裡的死迴圈也被稱為自旋
// 在沒有鎖的機制下可能需要藉助volatile關鍵字保證資料的可見性private volatile int value; /** 自增採用了CAS操作,每次從記憶體中讀取資料然後將此資料和+1後的結果進行CAS操作,如果成功就返回結果,否則重試直到成功為止。compareAndSet利用JNI來完成CPU指令的操作整體的過程就是這樣子的,利用CPU的CAS指令,同時藉助JNI來完成Java的非阻塞演算法。其它原子操作都是利用類似的特性完成的。*/public final int incrementAndGet() { // 自旋 for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return next; }}// CAS操作public final boolean compareAndSet(int expect, int update) { return unsafe。compareAndSwapInt(this, valueOffset, expect, update);}
CAS帶來的問題:
ABA問題
資料原來值為A,執行緒1將值變化為B,執行緒2將值變化為A。執行緒3檢視發現還是A,
迴圈時間長開銷大
CAS通常配合自旋來實現變數的執行緒安全,那麼當CAS一直獲取不到鎖的時候,會導致自旋時間過長,浪費資源
只能保證一個共享變數的原子操作
原子類
這裡就寫幾個簡單的程式展示下使用,具體還是要在工作中用到哪些在練習
AtomicInteger
package atomic。integer;import java。util。concurrent。atomic。AtomicInteger;public class AtomicIntDemo { static AtomicInteger i = new AtomicInteger(10); public static void main(String[] args) { //10——->11 System。out。println(i。getAndIncrement()); //11——->12——->out System。out。println(i。incrementAndGet()); System。out。println(i。get()); }}
AtomicBoolean
package atomic。booleanA;import java。util。concurrent。TimeUnit;import java。util。concurrent。atomic。AtomicBoolean;public class AtomicBooleanDemo implements Runnable { /** * 原子操作類 * static 必須有,因為exists是作為共享變數來使用的, * 如果去掉static,那麼在main方法中新建的 AtomicBooleanDemo 物件就是倆個exists就無法做到原子類的控制作用 */ private static AtomicBoolean exists = new AtomicBoolean(false); private String name; public AtomicBooleanDemo(String name) { this。name = name; } @Override public void run() { if (exists。compareAndSet(false, true)) { System。out。println(name + “ enter”); try { System。out。println(name + “ working”); TimeUnit。SECONDS。sleep(2); } catch (InterruptedException e) { // do nothing } System。out。println(name + “ leave”); exists。set(false); } else { System。out。println(name + “ give up”); } } public static void main(String[] args) { AtomicBooleanDemo bar1 = new AtomicBooleanDemo(“bar1”); AtomicBooleanDemo bar2 = new AtomicBooleanDemo(“bar2”); new Thread(bar1)。start(); new Thread(bar2)。start(); }}
AtomicIntegerArray
package atomic。array;import java。util。concurrent。atomic。AtomicIntegerArray;public class AtomicArrayDemo { static int[] value = new int[] { 1, 2 }; static AtomicIntegerArray array = new AtomicIntegerArray(value); public static void main(String[] args) { array。getAndSet(0, 3); System。out。println(array。get(0)); System。out。println(value[0]); }}
AtomicReference
這裡說明下,這個相當於更換了
AtomicReference
裡面的例項,不會對原資料修改
package atomic。reference;import java。util。concurrent。atomic。AtomicReference;public class AtomicReferenceDemo { static AtomicReference
AtomicStampedReference 帶版本戳的原子操作類
package atomic。reference;import java。util。concurrent。atomic。AtomicStampedReference;public class AtomicStampedReferenceDemo { static AtomicStampedReference
顯式鎖
Lock介面
看下這個介面有哪些方法
void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();void unlock();Condition newCondition();
先看一個demo
/** * @Author: jimmy * @Date: 2021/4/18 09:08 * @Description: */public class MyThread extends Thread{ private Test test = null; public MyThread(Test test, String name) { super(name); this。test = test; } @Override public void run() { test。insert(Thread。currentThread()); System。out。println(“執行緒 ” + Thread。currentThread()。getName() + “被中斷。。。”); }}package lock。lockexception;import java。util。concurrent。locks。Lock;import java。util。concurrent。locks。ReentrantLock;/** * @Author: jimmy * @Date: 2021/4/18 09:08 * @Description: */public class Test { private Lock lock = new ReentrantLock(); public static void main(String[] args) { Test test = new Test(); MyThread thread1 = new MyThread(test, “A”); MyThread thread2 = new MyThread(test, “B”); thread1。start(); thread2。start(); try { Thread。sleep(5000); System。out。println(“執行緒” + Thread。currentThread()。getName() + “ 睡醒了。。。”); } catch (InterruptedException e) { e。printStackTrace(); } thread2。interrupt(); } public void insert(Thread thread) { try { // 注意,如果將獲取鎖放在try語句塊裡,則必定會執行finally語句塊中的解鎖操作。若執行緒在獲取鎖時被中斷,則再執行解鎖操作就會導致異常,因為該執行緒並未獲得到鎖。 lock。lockInterruptibly(); System。out。println(“執行緒 ” + thread。getName() + “得到了鎖。。。”); long startTime = System。currentTimeMillis(); for (;;) { if (System。currentTimeMillis() - startTime >= Integer。MAX_VALUE) // 耗時操作 break; // 插入資料 } } catch (Exception e) { System。out。println(“————————————————”); } finally { System。out。println(Thread。currentThread()。getName() + “執行finally。。。”); lock。unlock(); System。out。println(“執行緒 ” + thread。getName() + “釋放了鎖。。。”); } }}
Lock介面和synchronized的比較:
synchronized 程式碼簡潔,
Lock:獲取鎖可以被中斷,超時獲取鎖,嘗試獲取鎖,讀多寫少用讀寫鎖
這裡還有幾個概念。
可重入鎖某個執行緒已經獲得某個鎖,可以再次獲取鎖而不會出現死鎖,
synchronized
ReentrantLock
都屬於可重入鎖。
使用
ReentrantLock
的時候一定要
手動釋放鎖
,並且
加鎖次數和釋放次數要一樣
公平鎖、非公平鎖在時間上,先對鎖進行獲取的請求,一定先被滿足,這個鎖就是公平的,不滿足,就是非公平的。非公平的效率一般來講更高。因為執行緒的上下文切換比較耗時,而非公平鎖可能會不存在切換上下文的情況。
ReadWriteLock& ReentrantReadWriteLock
ReadWriteLock
介面和讀寫鎖
ReentrantReadWriteLock
。不廢話直接上程式碼:
//——————- test ————————-///** * @Author: jimmy * @Date: 2021/4/18 10:17 * @Description: */public class RwLockTest { // 讀寫執行緒的比例 static final int readWriteRatio = 10; // 執行緒數 static final int threadCount = 3; private static class Reader implements Runnable{ private RwLockDemo rwLockDemo; public Reader(RwLockDemo rwLockDemo) { this。rwLockDemo = rwLockDemo; } @Override public void run() { long start = System。currentTimeMillis(); for(int i=0;i<100;i++){//操作100次 try { rwLockDemo。getNum(); } catch (InterruptedException e) { System。out。println(“Reader ex。。。。”); } } System。out。println(Thread。currentThread()。getName()+“讀取商品資料耗時:” +(System。currentTimeMillis()-start)+“ms”); } } private static class Writter implements Runnable{ private RwLockDemo rwLockDemo; public Writter(RwLockDemo rwLockDemo) { this。rwLockDemo = rwLockDemo; } @Override public void run() { long start = System。currentTimeMillis(); Random r = new Random(); for(int i=0; i<10; i++){//操作10次 try { Thread。sleep(50); rwLockDemo。setNum(r。nextInt(10)); } catch (Exception e) { System。out。println(“Writter ex。。。。”); } } System。out。println(Thread。currentThread()。getName() +“寫商品資料耗時:”+(System。currentTimeMillis()-start)+“ms————-”); } } // Test public static void main(String[] args) throws InterruptedException { GoodsInfo goodsInfo = new GoodsInfo(“Cup”,100000,10000); RwLockDemo rwLockDemo = new RwLockDemo(goodsInfo); for(int i = 0; i Tip:讀寫鎖同一時刻允許多個讀執行緒同時訪問,但是寫執行緒訪問的時候,所有的讀和寫都被阻塞,最適宜與 讀多寫少 的情況 Condition condition介面用來配合lock使用,表示條件,可以實現等待通知功能 public interface Lock { Condition newCondition();}public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll();} 下面是一個簡單的demo public class ExpressCond { public final static String CITY = “HarBin”; /*快遞運輸里程數*/ private int km; /*快遞到達地點*/ private String site; private Lock lock = new ReentrantLock(); private Condition keCond = lock。newCondition(); private Condition siteCond = lock。newCondition(); public ExpressCond() { } public ExpressCond(int km, String site) { this。km = km; this。site = site; } /** * 變化公里數,然後通知處於wait狀態並需要處理公里數的執行緒進行業務處理 */ public void changeKm(){ lock。lock(); try { this。km = 101; keCond。signalAll(); }finally { lock。unlock(); } } /** * 變化地點,然後通知處於wait狀態並需要處理地點的執行緒進行業務處理 */ public void changeSite(){ lock。lock(); try { this。site = “BeiJing”; siteCond。signal(); }finally { lock。unlock(); } } /** * 當快遞的里程數大於100時更新資料庫 */ public void waitKm(){ lock。lock(); try { while(this。km<=100) { try { keCond。await(); System。out。println(“check km thread[”+Thread。currentThread()。getId() +“] is be notifed。”); } catch (InterruptedException e) { // TODO Auto-generated catch block e。printStackTrace(); } } }finally { lock。unlock(); } System。out。println(“the Km is ”+this。km+“,I will change db”); } /** * 當快遞到達目的地時通知使用者 */ public void waitSite(){ lock。lock(); try { while(CITY。equals(this。site)) { try { siteCond。await(); System。out。println(“check site thread[”+Thread。currentThread()。getId() +“] is be notifed。”); } catch (InterruptedException e) { // TODO Auto-generated catch block e。printStackTrace(); } } }finally { lock。unlock(); } System。out。println(“the site is ”+this。site+“,I will call user”); }}public class TestCond { private static ExpressCond express = new ExpressCond(0,ExpressCond。CITY); /*檢查里程數變化的執行緒,不滿足條件,執行緒一直等待*/ private static class CheckKm extends Thread{ @Override public void run() { express。waitKm(); } } /*檢查地點變化的執行緒,不滿足條件,執行緒一直等待*/ private static class CheckSite extends Thread{ @Override public void run() { express。waitSite(); } } public static void main(String[] args) throws InterruptedException { for(int i=0;i<3;i++){ new CheckSite()。start(); } for(int i=0;i<3;i++){ new CheckKm()。start(); } Thread。sleep(1000); //快遞里程變化 express。changeKm(); }} AQS AbstractQueuedSynchronizer抽象的佇列式的同步器 AQS 設計思路是 CAS 操作 + volatile 關鍵字 + 改造的 CLH佇列 採用的是模板方法的設計模式。 獨佔模式獲取 accquire acquireInterruptibly tryAcquireNanos 共享式獲取 acquireShared acquireSharedInterruptibly tryAcquireSharedNanos 獨佔式釋放鎖 release 共享式釋放鎖 releaseShared 需要子類覆蓋的流程方法 獨佔式獲取 tryAcquire 獨佔式釋放 tryRelease 共享式獲取 tryAcquireShared 共享式釋放 tryReleaseShared 這個同步器是否處於獨佔模式 isHeldExclusively 下面稍微對原始碼進行解讀下,更加詳細的可以檢視 java多執行緒進階學習1的 ASQ 部分 // 狀態值 voatileprivate volatile int state;// CLH 佇列static final class Node {}// 獲取鎖、釋放鎖public final void acquire(int arg)public final void acquireInterruptibly(int arg)public final boolean release(int arg) 。。。// CAS private final boolean compareAndSetHead(Node update) protected final boolean compareAndSetState(int expect, int update) 。。。 AQS鎖的佔用和釋放 這裡直接使用事前總結的內容: 獲取獨佔鎖流程: 入口方法acquire(arg) tryAcquire(arg)嘗試獲取鎖,若成功則返回,若失敗則走下一步 將當前執行緒構造成一個Node節點,並利用CAS將其加入到同步佇列到尾部,然後該節點對應到執行緒進入自旋狀態自旋時,首先判斷其前驅節點是否為頭節點&是否成功獲取同步狀態,兩個條件都成立,則將當前執行緒的節點設定為頭節點,如果不是,則利用** LockSupport。park(this) **將當前執行緒掛起 ,等待被前驅節點喚醒 釋放獨佔鎖流程: release(arg) tryRelease(arg)釋放同步狀態 獲取當前節點的下一個節點,利用LockSupport。unpark(currentNode。next。thread)喚醒後繼節點 獲取共享鎖流程 acquireShared(arg)入口方法 tryAcquireShared(arg)模版方法獲取同步狀態,如果返返回值>=0,則說明同步狀態(state)有剩餘,獲取鎖成功直接返回 如果tryAcquireShared(arg)返回值<0,說明獲取同步狀態失敗,向佇列尾部新增一個共享型別的Node節點,隨即該節點進入自旋狀態 自旋時,首先檢查前驅節點釋放為頭節點&tryAcquireShared()是否>=0(即成功獲取同步狀態) 如果是,則說明當前節點可執行,同時把當前節點設定為頭節點,並且喚醒所有後繼節點。如果否,則利用LockSupport。unpark(this)掛起當前執行緒,等待被前驅節點喚醒 釋放共享鎖流程 releaseShared(arg)模版方法釋放同步狀態。如果釋放成,則遍歷整個佇列,利用LockSupport。unpark(nextNode。thread)喚醒所有後繼節點 獨佔鎖和共享鎖在實現上的區別 獨佔鎖的同步狀態值為1,即同一時刻只能有一個執行緒成功獲取同步狀態 共享鎖的同步狀態>1,取值由上層同步元件確定 獨佔鎖佇列中頭節點執行完成後釋放它的直接後繼節點 共享鎖佇列中頭節點執行完成後釋放它後面的所有節點 共享鎖中會出現多個執行緒(即同步佇列中的節點)同時成功獲取同步狀態的情況