RxJava私的メモ
Observableの生成
・from
IterableなオブジェクトからObservableを生成。
Observable.from(T[] array) Observable.from(Iterable<? extends T> iterable) Observable.from(new String[]{"a", "b", "c"})
・create
OnSubscribeを指定してObservableを生成。
Observable<String> o = Observable.create(subscriber -> { subscriber.onNext("Hello"); subscriber.onNext("world!"); subscriber.onCompleted(); });
・range
要素の範囲を指定してObservableを生成。
Observable<Integer> o = Observable.range(1, 5);
・just
引数で直接Observableを生成
Observable<Integer> o = Observable.just(1, 2, 3);
subscribeOnとobserveOn
・subscribeOn
ObservableのonSubscribe を実行するスレッドを変更する。
subscribeOnはどのタイミングで呼んでも全てのオペレータに適用される。
・observeOn
以降のオペレータでのコールバックを処理するスケジューラをきめる。
・指定しない場合
Observableの実装や叩いたOperator(map()やfilter()などのメソッド)によりますが、基本的に現在のスレッド(subscribe()を呼び出したスレッド)で「同期的に」実行されます。
Schedulers
指定したオペレータをどのスレッドで実行するかを指定できます。
最初はSchedulers#newThreadとAndroidSchedulers#mainThreadから入ると良いと思います。
・#io
I/O処理のためのScheduler。
・#computation
計算処理のためのScheduler。イベントループ 処理コールバックなど。
・#immediate
現在のスレッドで作業するためのScheduler。処理を割り込んで実行させます。
・#newThread
新規にスレッドを生成します。
・#trampoline
擬似的なマルチタスクとして実行します。
・AndroidSchedulers#mainThread()
メイン(UI)スレッドで実行します。 (RxAndroidを導入する必要あり)
ライフサイクル
意識しないといけないのはsubscribeメソッドが発行するSubscription。
・ライフサイクルに合わせてunsubscribeする
SubscriberのonNext, onErrorでunsubscribeが呼ばれる。
非同期のSubscriptionは投げっぱなしにするとリークするので適所でunsubscribe!
・複数のSubscription
CompositeSubscriptionを利用します。Subscriptionが発行される都度CompositeSubscription#addする。
こちらもライフサイクルに合わせてunsubscribeする。
error operators
通常はsubscribe内でonErrorを実装する. subscribe内がonNextだけの実装になっている場合は,
下記のメソッドを実装する必要がある.
・onErrorReturn
Observableチェインのなかで発生したErrorをキャッチして、大体のObjectに変換することでsubscriberにErrorが渡されるのを防ぐことができます。
・onErrorResumeNext
Observableチェインのなかで発生したErrorをキャッチして、その中で再度Observableを呼び出すことで、エラー時に代替のStreamを返すことができます。
ハマりどころ
onErrorにコンテキストの参照をもたせるとリークする
既存処理をRxに移行する
// 非同期処理に乗せるAPIからのデータ取得メソッド
public List
↓
// 戻り値の型をObservableでラップ
public Observable<List