RxJava中怎么实现取消订阅功能

RxJava中怎么实现取消订阅功能,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

手动取消订阅

Consumer类型

Observable创建返回Disposable取消

public class SecondActivity extends AppCompatActivity {  private static final String TAG = "SecondActivity";  private Disposable disposable;  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_second);    disposable = Observable.create(new ObservableOnSubscribe<String>() {      @Override      public void subscribe(ObservableEmitter<String> emitter) throws Exception {        try {          Thread.sleep(5000);        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Consumer<String>() {          @Override          public void accept(String s) throws Exception {            Log.d(TAG, "accept: "+s);          }        });  }  @Override  protected void onDestroy() {    super.onDestroy();    Log.d(TAG, "onDestroy: ");    //取消订阅    if(disposable != null && !disposable.isDisposed()){      disposable.dispose();      Log.d(TAG, "onDestroy: dispose");    }  }}

普通类型Observer

在Observer中获取Disposable然后取消

public class ThirdActivity extends AppCompatActivity {  private static final String TAG = "ThirdActivity";  Disposable disposable;  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_third);    Observable.create(new ObservableOnSubscribe<String>() {      @Override      public void subscribe(ObservableEmitter<String> emitter) throws Exception {        try {          Thread.sleep(5000);          emitter.onNext("testInfo");        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Observer<String>() {          @Override          public void onSubscribe(Disposable d) {            disposable = d;          }          @Override          public void onNext(String s) {            Log.d(TAG, "onNext: "+s);          }          @Override          public void onError(Throwable e) {            Log.d(TAG, "onError: ");          }          @Override          public void onComplete() {            Log.d(TAG, "onComplete: ");          }        });  }  @Override  protected void onDestroy() {    super.onDestroy();    Log.d(TAG, "onDestroy: ");    //然后在需要取消订阅的地方调用即可    if (disposable != null && !disposable.isDisposed()) {      Log.d(TAG, "dispose: ");      disposable.dispose();    }  }}

DisposableObserver类型

利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消

public class FourthActivity extends AppCompatActivity {  private static final String TAG = "FourthActivity";  private DisposableObserver<String> observer;  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_fourth);    observer = Observable.create(new ObservableOnSubscribe<String>() {      @Override      public void subscribe(ObservableEmitter<String> emitter) throws Exception {        try {          Thread.sleep(5000);          emitter.onNext("testInfo");        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribeWith(new DisposableObserver<String>() {      @Override      public void onNext(String o) {        Log.d(TAG, "onNext: "+o);      }      @Override      public void onError(Throwable e) {        Log.d(TAG, "onError: ");      }      @Override      public void onComplete() {        Log.d(TAG, "onComplete: ");      }    });  }  @Override  protected void onDestroy() {    super.onDestroy();    if (observer != null && !observer.isDisposed()) {      Log.d(TAG, "dispose: ");      observer.dispose();    }  }}

取消多个Observer

把多个Observer添加CompositeDisposable,一次取消

public class ComDisposableActivity extends AppCompatActivity {  private Disposable disposable1;  private Disposable disposable2;  private static final String TAG = "ComDisposableActivity";  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_com_disposable);    Observable.create(new ObservableOnSubscribe<String>() {      @Override      public void subscribe(ObservableEmitter<String> emitter) throws Exception {        try {          Thread.sleep(5000);          emitter.onNext("testInfo");        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .doOnDispose(new Action() {          @Override          public void run() throws Exception {            Log.d(TAG, "run: Unsubscribing subscription from onCreate()");          }        })        .subscribe(new Observer<String>() {          @Override          public void onSubscribe(Disposable d) {            disposable1 = d;          }          @Override          public void onNext(String s) {            Log.d(TAG, "onNext: "+s);          }          @Override          public void onError(Throwable e) {            Log.d(TAG, "onError: ");          }          @Override          public void onComplete() {            Log.d(TAG, "onComplete: ");          }        });    Observable.create(new ObservableOnSubscribe<String>() {      @Override      public void subscribe(ObservableEmitter<String> emitter) throws Exception {        try {          Thread.sleep(5000);          emitter.onNext("testInfo");        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Observer<String>() {          @Override          public void onSubscribe(Disposable d) {            disposable2 = d;          }          @Override          public void onNext(String s) {            Log.d(TAG, "onNext: "+s);          }          @Override          public void onError(Throwable e) {            Log.d(TAG, "onError: ");          }          @Override          public void onComplete() {            Log.d(TAG, "onComplete: ");          }        });  }  @Override  protected void onDestroy() {    super.onDestroy();    CompositeDisposable compositeDisposable = new CompositeDisposable();    //批量添加    compositeDisposable.add(disposable1);    compositeDisposable.add(disposable2);    //最后一次性全部取消订阅    compositeDisposable.dispose();  }}

RxLifecyle取消

OnDestory取消

Observable.interval(1, TimeUnit.SECONDS)        .doOnDispose(new Action() {          @Override          public void run() throws Exception {            Log.d(TAG, "Unsubscribing bindToLifecycle from onDestroy()");          }        })        .compose(this.<Long>bindToLifecycle())        .subscribe(new Consumer<Long>() {          @Override          public void accept(Long num) throws Exception {            Log.d(TAG, "accept: " + num);          }        });

指定生命周期取消

Observable.interval(1,TimeUnit.SECONDS)        .doOnDispose(new Action() {          @Override          public void run() throws Exception {            Log.d(TAG, "Unsubscribing UbindUntilEvent from onPause()");          }        }).compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))        .subscribe(new Consumer<Long>() {          @Override          public void accept(Long aLong) throws Exception {            Log.d(TAG, "bindUntilEvent accept: " + aLong);          }        });

看完上述内容,你们掌握RxJava中怎么实现取消订阅功能的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!