繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

為什麼大海無量火了

話題

透過上一篇我們分析了RxJava的訂閱,以及RxJava常見的面試問題,還沒有看上一篇點這裡RxJava面經一,拿去,不謝!。在上一篇我們分析過多次呼叫subscribeOn只有第一次呼叫的時候有效,原因是因為

最開始呼叫的subscribeOn返回的observable會把後面執行的subscribeOn返回的observable給覆蓋了,因此我們感官的是隻有第一次的subscribeOn能生效。實際中間每一個Observable生成的時候還是會有指定的執行緒的,只是在最上游的observable只接收第一次的subscribeOn指定的執行緒

,那麼我們可以透過doOnSubscribe監測中間的observable確實有自己的執行緒,這是我們這節探討的話題。

前一節分析了透過observeOn指定了緊跟其後的observer的執行緒,

如果我們多次呼叫observeOn,其實是最後一次observeOn才有效指定observer的執行緒

,那我們可以透過doOnNext來監聽每一次的observeOn執行緒的切換,這是我們這節探討的話題。

如果我們沒指定observer的執行緒,只指定了observable的執行緒,則observer的執行緒則會跟observable的執行緒一起走的,也就是我們只設置了subscribeOn,而沒有設定observeOn的情況。這是我們這節探討的話題。

上一節我們簡單的提過背壓,那麼背壓是什麼呢,以及Flowable怎麼能控制背壓也是我們這節討論的話題。

這節探討的話題

doOnSubscribe是怎麼做到監聽中間的observable的執行緒?

doOnNext是怎麼做到監聽每一次observeOn執行緒的切換,以及map的apply方法的執行緒有誰控制?

如果不指定observer的執行緒,也就是指設定subscribeOn,而不設定observeOn,那observer的執行緒是什麼樣的?

背壓是什麼,以及Flowable怎麼能控制背壓?

doOnSubscribe的監聽

在上一節我們介紹過subscribeOn是控制上游的observable在哪個執行緒執行,關於怎麼控制上游的observable可以看我上篇文章RxJava面經一,拿去,不謝!,那如果多次執行subscribeOn的時候,Observable接收的是第一次的subscribeOn指定的執行緒,因為每次設定都會被上一層subscribeOn設定的執行緒所覆蓋了,這裡的覆蓋是對於最上游的Observable而言的,中間生成的Observable其實是有執行緒切換的,我們可以透過doOnSubscribe來監聽每一次subscribeOn執行緒的切換,我們還是拿例子來說:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

在上一篇文章我們已經說過,訂閱是從最下游的observer到上面一層一層的observable,所以我們最下游的observable開始發生訂閱,也就是①處透過

subscribeOn

生成的

ObservableSubscribeOn

觀察者開始訂閱,它會在訂閱方法中,給它的上游的observable新增訂閱,也就是②號處透過

doOnSubscribe

生成的

ObservableDoOnLifecycle

觀察者開始訂閱,然後在它的訂閱裡面給③號訂閱,③號給④號新增訂閱,最後到最上游的observable發生訂閱,也就是最上游的

ObservableOnSubscribe

的subscribe方法被呼叫。這就是從下到上依次訂閱的順序,下面以一張圖說明訂閱順序:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

那什麼時候doOnSubscribe的內部類Consumer的accept方法什麼呼叫呢?

我們直接看上面的圖,它是在上一個Observable,也就是doOnSubscribe生成的

ObservableDoOnLifecycle

裡面的裝飾observer(

DisposableLambdaObserver

)監聽到訂閱的時候呼叫的。而在該例子中②號、④號透過doOnSubscribe生成的observable的上游observable是subscribeOn生成的,而subscribeOn最終是生成了

ObservableSubscribeOn

的observable,在它的訂閱裡面是直接給下游的observer新增訂閱監聽了:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

所以由上面可知②號處的doOnSubscribe列印是在③號上游的subscribeOn發生訂閱的時候,所以它最先打印出結果,再一次是④號打印出結果,最後是最上游的observable的訂閱列印。那每一處的doOnSubscribe中accept接收到的執行緒是怎麼回事呢,這個我先說結論,是跟它下面的subscribeOn指定的執行緒保持一致。所以②號處列印是①號處指定的執行緒,④號是③號處指定的執行緒列印,後面我們分析doOnSubscribe時候說。

上一節我們知道subscribeOn是指定它上游的observable訂閱發生的執行緒,

而doOnSubscribe運算子最終也是生成了一個ObservableDoOnLifecycle的observable,所以可以這麼說ObservableDoOnLifecycle的訂閱發生的執行緒是由緊跟它後面的subscribeOn指定的執行緒所決定的。

而在

ObservableDoOnLifecycle

的訂閱方法中,它是直接訂閱了上游的observable,在上面示例中也就是第二個

observable

ObservableDoOnLifecycle

subscribeActual

方法如下:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

上一篇介紹了在每一個Observable的訂閱方法中,會先建立裝飾的observer,並且把下游的observer傳到建立的裝飾的observer中,接著會給下游的observer新增訂閱的回撥,接著會給上游的observable新增訂閱,而在此處的

ObservableDoOnLifecycle

訂閱方法中先是建立了DisposableLambdaObserver的裝飾observer,接著給上游的observable新增訂閱。那給下游的obserer新增訂閱的監聽呢,這就放在了DisposableLambdaObserver的裝飾observer的

onSubscribe

中了。

由於上面我們透過doOnSubscribe生成最下游

observable(ObservableDoOnLifecycle)

的訂閱執行緒是io執行緒,所以它的上游observable也是io執行緒,我們還沒分析doOnSubscribe傳進去的Consumer的accept方法發生的執行緒,這個需要我們看下上面分析的ObservableDoOnLifecycle訂閱中建立的裝飾DisposableLambdaObserver:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

上面一上來就是給傳進來的Consumer執行了accept的回撥,緊接著給下游的observer新增訂閱的監聽,方便下游的observer能收到訂閱的回撥啊,是不是這麼回事呢?

那此處裝飾的

observer(DisposableLambdaObserver)

訂閱監聽是由誰發起的呢,肯定是上游的

observable

開始訂閱的時候發起的下游

observer

訂閱監聽啊,而上面我們分析了此處的上游

observable

訂閱執行緒是由緊挨

doOnSubscribe

subscribeOn

決定的,所以此處不難看出最終

doOnSubscribe

中的

consumer

監聽的是

subscribeOn

指定上游的

observable

訂閱過程中發生的執行緒,大家可以多理解這句話!!!

下面畫張圖補補腦:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

doOnSubscribe小節

上面分析在沒有結合原始碼的情況下,不好分析,整體就是subscribeOn會指定它上游的observable執行緒,而它上游又正好是doOnSubscribe生成的observable,該observable是

ObservableDoOnLifecycle

,在它的訂閱裡面又直接去訂閱了它上游的observable,所以此時doOnSubscribe的上游observable執行緒也是doOnSubscribe它下面的subscribeOn指定的,而doOnSubscribe的上游observable是subscribeOn生成的,它是ObservableSubscribeOn,在它的訂閱裡面是直接監聽了下游的observer訂閱回撥,也就是doOnSubscribe生成的ObservableDoOnLifecycle訂閱中生成的裝飾DisposableLambdaObserver,它的訂閱監聽會呼叫doOnSubscribe傳進來的Consumer的accept方法。

所以這就是多次呼叫subscribeOn可以透過doOnSubscribe來做執行緒切換的監聽

doOnNext監聽observeOn執行緒的切換,map的apply方法的執行緒由誰控制?

首先我們還是透過例子來回答上面你的問題,先來看doOnNext的使用:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

關於doOnNext其實很好理解,發射資料因為是從上游的observable到下游的observable,而observeOn是指定下游的observer發射資料的執行緒,這個我在上一篇講過,而doOnNext實際生成的是一個ObservableDoOnEach的observable,在該訂閱方法中,會生成裝飾的observer,也就是DoOnEachObserver,所以observeOn實際是控制了DoOnEachObserver發射資料的執行緒,而在它發射onNext資料的時候,會呼叫onNext傳進來的Consumer的accept方法:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

observable裡面還有onComplete、onError的監聽,他們最終都是生成了

ObservableDoOnEach

的observable

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

doOnNext小節

doOnNext中是透過傳進去的Consumer作為上游發射資料過來的監聽,在上游observable發射資料的時候,會執行doOnNext的Consumer的accept方法,所以在上面多次透過observeOn指定執行緒的時候,可以透過doOnNext拿到切換執行緒的。

所以這就是多次呼叫observeOn可以透過onNext來做執行緒切換的監聽。

關於map的apply方法的執行緒由誰來控制,我們這塊直接看map的observable,它是一個

ObservableMap

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

不難看出,map運算子生成的

ObservableMap

,在它的訂閱方法中,生成裝飾的MapObserver,接著給上游的observable新增訂閱,

在MapObserver接收到上游的observable發射onNext資料的時候會呼叫map傳進來的function的apply方法,因此apply的方法是跟上游的observable發射資料的執行緒有關

,我們來看下面例子:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

我們知道subscribeOn是指定上游的observable的訂閱執行緒,我們在上篇文章講過多個subscribeOn指定執行緒,只有第一次有效,這是針對最上游的observable而言的,所以最上游的observable發射資料端的執行緒緊跟它後面指定的io執行緒保持一致,所以會有如下列印:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

而在每一個subscribeOn發射資料的時候不會改變執行緒,所以map的執行緒會保持最上游的observable的執行緒,也就是io執行緒,所以列印會有如下:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

既然subscribeOn不會改變發射資料的執行緒,導致多次subscribeOn不會改變map的執行緒,所以只會跟最上游的observable發射資料的執行緒保持一致,那我們如果中間插入observeOn呢,下面來看下這個例子:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

由於observeOn會改變給下游傳送資料的時候執行緒,也就是改變下游observer接收資料的執行緒,也即onNext、onComplete、onError方法,所以observeOn指定的執行緒會一直傳到了下游MapObserver的onNext方法中,所以最終map中的function的apply方法是main執行緒

,列印結果如下:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

其他情況大家可以嘗試,比如多次指定observeOn執行緒,看map最終的執行緒如何

map小節

map將傳進去的function作為上游發射資料過來的監聽,在上游observable發射資料的時候,會執行function的apply方法來達到轉換資料的目的,所以map中function的apply方法是跟上游的observable發射資料的執行緒有關。

如果不指定observer的執行緒,也就是指設定subscribeOn,而不設定observeOn,那observer的執行緒是什麼樣的?

我感覺理解了整個訂閱的過程,其實理解這個問題一點都不難,既然subscribeOn是指定上游的observable的執行緒,那麼最終的上游observable發射資料時候的執行緒也會被緊挨著它的subscribeOn指定的執行緒有關啊,並且不設定observeOn指定下游的observer的執行緒,那麼observer的執行緒是不是跟最上游observable發射資料的執行緒保持一致啊。

背壓是什麼,以及Flowable怎麼能控制背壓?

它是指由於上游的observable發射資料太快,下游observer接收資料跟不上來導致的一種現象。可以形象理解為水壩在儲存水的時候為了保持水的平衡,給下游的水庫放水,同時會接收上游的水流,如果上游的水流很大,那麼水壩中的水位激增,而水壩給下游放水的能力有限,所以就會導致水壩中的水漫過水壩。

RxJava1.0背壓

注:說到RxJava背壓還得從RxJava1。0開始說起,這裡分析的RxJava1。0版本原始碼是在1。3。8版本分析

Observable。unsafeCreate(new Observable。OnSubscribe() { @Override public void call(Subscriber<? super Integer> subscriber) { int i = 0; while (true) { subscriber。onNext(i); i++; } }})。subscribe(new Observer() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log。d(TAG, “onError:” + e。getMessage()); e。printStackTrace(); } @Override public void onNext(Integer integer) { Log。d(TAG, “onNext:” + integer); }});

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

Observable。unsafeCreate(new Observable。OnSubscribe() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; ; i++) { subscriber。onNext(i); } }})。subscribeOn(Schedulers。io())。observeOn(AndroidSchedulers。mainThread())。subscribe(new Observer() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log。d(TAG, “onError:” + e。getMessage()); e。printStackTrace(); } @Override public void onNext(Integer integer) { Log。d(TAG, “onNext:” + integer); }});

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

報的是

MissingBackpressureException

異常資訊,其實在RxJava1。0中這就是支援背壓的策略,直接透過異常的資訊反饋給使用者。在RxJava1。0中支援最大的發射資料是16個,也就是說發射大於或等於17個的時候就會出現異常,下面透過程式碼驗證下:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

資料能正常接收,如果我們把資料調整到17呢,是不是會發生異常呢:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

看吧,不用我說啥吧,看來底層是限制了上游傳送資料的個數,其實這個是RxJava1。0背壓策略的一種機制,透過數量來控制,我們可以在這裡找到定義的數量大小:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

RxJava背壓對android平臺做了傳送資料的限制,如果大於16個則直接拋

MissingBackpressureException

異常,

底層透過上游Observable傳送的資料放到佇列中,而這裡的16則是定義佇列的容量,每次在往佇列中放資料的時候會先獲取下一個要放資料的索引,如果發現索引位置的資料不為空,則認為佇列已經滿了,那麼滿了就直接返回onError的資訊。

比如我們在傳送第17個數據的時候,在獲取索引的時候是透過與對接容量16 進行相與得到索引,相與之後得到一個小於16的索引,發現相與之後得到的索引上還有資料,則傳送第17個數據放進佇列的時候失敗,所以直接丟擲onError的資訊。核心原始碼在這裡:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

這個過程還是蠻清晰的,OperatorObserveOn是observeOn方法傳給OnSubscribeLift的Operator物件,在OperatorObserveOn中會先初始化佇列,並且佇列的容量是16,接著在onNext接收上游傳送過來的資料的時候,會判斷佇列的offer是否成功,如果不成功,則直接拋onError的錯誤,那什麼時候offer會失敗呢,得看當前傳送過來的資料是否超過了佇列的容量,如果超過則offer失敗。

所以這就是RxJava1。0中背壓策略,透過設定上游傳送過來的資料的接收佇列容量來達到背壓。

RxJava2.0背壓

注:RxJava執行版本是2。2。20

在RxJava2。0中不再在Observable支援背壓,而是透過Flowable來代替了,也就是說Observable中不再透過異常的形式告訴使用者了,也就是不拋MissingBackpressureException異常了,下面來看看RxJava2。0正常傳送資料的問題:

Observable。create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter。onNext(i); } }})。subscribeOn(Schedulers。io())。observeOn(AndroidSchedulers。mainThread())。subscribe(new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { Log。d(TAG, “onNext:” + integer); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { }})

雖然說不拋MissingBackpressureException異常了,但是記憶體佔用很糟糕啊,所以針對RxJava2。0的問題,我們有沒有處理辦法呢,大家會說RxJava2。0不是已經支援Flowable了嗎,直接使用它啊,如果讓我們自己來處理啊該怎麼辦呢,首先我們分析背壓產生的原因是什麼:

上游傳送的事件太快,下游處理不過來

上游傳送的事件太多,下游處理不過來

首先針對第一種我們可以讓上游傳送速度慢點,怎麼慢點呢,讓io執行緒每次傳送的時候停留一會:

Observable。create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter。onNext(i); Thread。sleep(1000); } }})。subscribeOn(Schedulers。io())。observeOn(AndroidSchedulers。mainThread())。subscribe(new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { Log。d(TAG, “onNext:” + integer); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { }});

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

針對第二種的話,我們可以讓下游的observer少接收點資料:

Observable。create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { //無限迴圈發事件 emitter。onNext(i); } }})。filter(new Predicate() { @Override public boolean test(Integer integer) throws Exception { return integer % 100 == 0; }}) 。subscribeOn(Schedulers。io()) 。observeOn(AndroidSchedulers。mainThread()) 。subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log。d(TAG, “” + integer); } });

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

知道了背壓產生的原因後,我們再回頭看RxJava本身用Flowable來支援背壓策略,而且它的策略比較豐富,下面來一一介紹,我們先從Flowable。create入手:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

Observable。unsafeCreate(new Observable。OnSubscribe() { @Override public void call(Subscriber<? super Integer> subscriber) { int i = 0; while (true) { subscriber。onNext(i); i++; } }})。subscribe(new Observer() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Log。d(TAG, “onError:” + e。getMessage()); e。printStackTrace(); } @Override public void onNext(Integer integer) { Log。d(TAG, “onNext:” + integer); }});

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

在策略為error情況下,並且沒切換執行緒的時候,直接報

io。reactivex。exceptions。MissingBackpressureException: create: could not emit value due to lack of requests

錯誤,該錯誤是告訴你沒對下游的observer設定request的方法,這個是由於在單執行緒情況下,沒預設給observer設定處理資料的能力,也即是個數,所以上游不知道下游的處理能力,直接拋error錯誤。

下面怎麼設定下游處理能力呢:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

直接下游接收到訂閱方法中新增

subscription。request(Long。MAX_VALUE)

,官方建議我們使用

Long。MAX_VALUE

,表示告訴上游,下游的處理能力最大,你儘管傳送資料給我吧。

那如果上游傳送的資料個數大於下游設定的個數呢:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

可以看到在傳送第4個數據的時候,直接拋異常了,因為下游設定的處理能力是3個,每次在傳送完一次的時候,會“

削弱

”,下游的處理資料的能力,等到傳送第四個資料的時候,發現下游已經不能再處理了,直接拋異常。

上面都是在error策略,單執行緒下的結果,那如果在多執行緒中結果會是咋樣呢,還得從幾種策略情況來看:

MISSING

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

在多執行緒下必須設定下游的處理能力,因為在observeOn給下游傳送資料的時候需要知道下游能處理資料的個數。上面我們演示的是上游傳送128個數據,結果沒有像MISSING策略所說的丟擲丟擲MissingBackpressureException或IllegalStateException異常資訊,這是因為Flowable預設認為128個數據是上游傳送最多的資料,我們可以透過這裡找到定義的數量:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

底層其實是跟RxJava1。0點的做法是一樣的,也是把這個容量作為佇列的大小,只不過RxJava1。0的容量是16個,所以再發送第129個數據的時候,會出現佇列放滿的問題,一旦放滿,再往裡面放資料就會出現RxJava中定義的各種策略情況,下面我們把傳送資料改為129個,看看MISSING會出現什麼情況:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

看到了吧,直接拋

io。reactivex。exceptions。MissingBackpressureException: Queue is full?!

異常資訊。

MISSING內部的發射器裡面其實啥都沒做,它傳送異常是在FlowableObserveOn.ObserveOnSubscriber內部類的onNext時候,發現128個大小的佇列滿了後,給下游的observer傳送onError的資訊。

ERROR

測試程式碼我就不貼了,直接把上面上面的

BackpressureStrategy。MISSING

改為

BackpressureStrategy。ERROR

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

其實和MISSING丟擲的異常是一樣的,只不過異常的message不一樣而已。

內部透過傳送資料的時候定義一個AtomicLong的計數器,每次在給下游傳送完一個數據後,會將該計數器減一,等到減到0的時候,直接在上游給下游傳送onError的資訊。

BUFFER

這個其實跟RxJava2。0的Observable使用沒什麼區別,輸出的容量沒有大小限制,也不會像RxJava1。0一樣拋異常,請謹慎使用。

DROP

drop是在第一次拿到128個數據後,第二次從佇列中拿資料的時候,中間跟不上速度的資料拋棄了,等到下游處理完先前的128個數據的時候,才能接收後面96個數據,至於這裡為什麼是96個數字,是因為後面的容量減為

this。limit = prefetch - (prefetch >> 2);

這個大小了,prefetch是128,大家自己算吧:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

這裡從5118。。。。96個數。。。從5118算起第96個數

DROP是當給下游傳送資料的時候,自己有個限流的策略,透過AtomicLong裝載的128大小的計數器,每次傳送完一個數據後,會將該計數器減一,那如果傳送到了128個數據的時候,由於計數器減到0了,等到下游處理完這128個數據的時候,才會把計數器給調整到96,所以中間會出現丟資料的情況,等到下游處理完先前128個數據的時候,上游再次發資料的時候已經不會從129個數開始了,而且發的這96個數是隨機的,因為下游處理前面128個數的時間是不確定的。

LATEST

我們先來看latest是什麼樣的效果,為了要區分和drop的效果,我們將發射資料改為2000的資料量:

Flowable。create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter emitter) throws Exception { for (int i = 0; i < 2000; i++) { emitter。onNext(i); } }}, BackpressureStrategy。LATEST)。subscribeOn(Schedulers。io())。observeOn(AndroidSchedulers。mainThread()) 。subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { Log。d(TAG, “onSubscribe”); s。request(Long。MAX_VALUE); subscription = s; } @Override public void onNext(Integer integer) { Log。d(TAG, “onNext: ” + integer); } @Override public void onError(Throwable t) { Log。w(TAG, “onError: ”, t); } @Override public void onComplete() { Log。d(TAG, “onComplete”); } })

這裡得到的結果會是這樣的:

繼上篇,關於Rxjava,讓你知道什麼叫“大海無量”話題

它是這麼接收資料的:0-127。。。。96個數。。。最後一個數,latest是先發送0到127的資料,然後中間傳送96個數字,這中間會有丟失資料的,而最後會把最後一個數據傳送給下游。

latest沒有像drop那樣透過計數器的形式限制傳送資料的速度,而是在傳送資料的時候定義了AtomicReference原子類,把資料放在裡面,所以它是每次只能儲存一個數據,等處理完前面的128個數據的時候,會將AtomicLong定義的128個數量減到0,所以在下游接收完前面的128個數的時候,上游才能給下游傳送後面96個數,等到最後的時候會由於快取的是最後一個數,所以只能傳送給下游的只能是最後一個數。

好了,關於背壓的幾種策略就那麼幾種,其實我們總結下來:

MISSION:上游沒有限流,在下游裡面發現佇列滿了,給下游傳送onError的資訊,該資訊是

io。reactivex。exceptions。MissingBackpressureException: Queue is full?!

ERROR:在上游透過限流的形式給下游傳送資料,在發現數據量到了128個的時候,會給下游傳送onError的資訊,該資訊是

create: could not emit value due to lack of requests

DROP:也是在上游透過限流的形式給下游傳送資料,在發現數據量到了128的時候,會等下游處理完這128個數據,等到處理完了,繼續處理梳理,所以在等的過程中會有資料丟失的問題。

LATEST:雖然和DROP都是同樣的丟資料,但是它兩的做法是不一樣的,LATEST透過只能放一個數據的容易來給下游傳送資料,最開始丟資料基本是一樣的,但是LATEST會保留最後一條資料,是因為最後處理資料的時候,容器裡面還有一條資料。

BUFFER:這個跟RxJava2。0普通傳送資料是一樣的,它不支援背壓,上游傳送多少資料,下游會接收多少資料,直到發生OOM。

總結

介紹了doOnSubscribe監聽每次subscribeOn的執行緒切換

doOnNext監聽每一次observeOn執行緒的切換,以及map的apply方法的執行緒是由上游傳送資料的observable決定的。

如果不指定observer的執行緒,也就是指設定subscribeOn,而不設定observeOn,那observer的執行緒跟上游的observable一起走的。

介紹了RxJava1。0和RxJava2。0背壓的使用,以及他們的區別。

關於Rxjava第二篇文章就介紹完了,如果還有什麼不懂的地方可以直接留言問我。