rxjava总结
RxJava2.0
- 特点
- 实现优雅
- RxJava的实现方式 = 基于事件流的链式调用
- 逻辑简洁
- 使用简单
- 随着程序逻辑变得复杂性提高,它依然能保持简洁和优雅
- 实现优雅
- 定义
- 一个基于事件流,实现异步操作的库
- 作用
- 实现异步操作
- 注: 类似于 Android 中的 AsyncTask, handler 作用
- 使用步骤
- 使用方式1
- 1:创建被观察者 (Observable )& 生产事件
- 顾客入饭店 - 坐下餐桌 - 点菜
- 顾客入饭店 - 坐下餐桌 - 点菜
- 2:创建观察者 (Observer )并 定义响应事件的行为
- 开厨房 - 确定对应菜式
-
- 开厨房 - 确定对应菜式
- 3:通过订阅(Subscribe)连接观察者和被观察者
- 顾客找到服务员 - 点菜 - 服务员下单到厨房 - 厨房烹调
- 顾客找到服务员 - 点菜 - 服务员下单到厨房 - 厨房烹调
- 1:创建被观察者 (Observable )& 生产事件
- 使用方式2
基于事件流的链式调用
- 使用方式1
- RxJava操作符一览表
- 操作符
- 创建操作符
- 应用场景
- 完整和快速创建被观察者
- 定时操作
- 周期性操作
- 数组 / 集合 遍历
- 类型
- 基本创建
- create() 创建 Observabe 基本操作符
- 完整创建被观察者对象
- RxJava创建对象基本操作符
- 作用
- 完整创建1个被观察者对象
- create() 创建 Observabe 基本操作符
- 快速创建和发送
- just()
- 快速创建一个被观察者对象 发送10个以内的事件
- fromArray()
- 直接发送 传入的数组数据,可以发送10个已上的事件
- fromIterable()
- 直接发送 传入的集合List数据,可以发送10个已上的事件
- nerver()
- 该方法创建的被观察者对象发送事件的特点:不发送任何事件
- empty()
- 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
- error()
- 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
- just()
- 延迟创建
- defer()
- 直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
- 示例
- timer()
- 快速创建1个被观察者对象(Observable)
- 发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
- interval()
- 快速创建1个被观察者对象(Observable)
- 发送事件的特点:每隔指定时间 就发送 事件
- intervalRange()
- 快速创建1个被观察者对象(Observable)
- 发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量
- range()
- 发送事件的特点:连续发送 1个事件序列,可指定范围
- 发送事件的特点:连续发送 1个事件序列,可指定范围
- rangeLong()
- defer()
- 基本创建
- 应用场景
- 变换操作符
- 类型
- Map()
- 对 被观察者发送的每1个事件都通过 指定的函数 处理,从而变换成另外一种事件
- 原理
- 应用场景
- 数据类型转换
- 使用
- FlatMap()
- 应用场景
- 无序的将被观察者发送的整个事件序列进行变换
- 作用
- 将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送
- 原理
- 为事件序列中每个事件都创建一个 Observable 对象;
- 将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象;
- 将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;
- 新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer)
- 使用
- 应用场景
- Buffer()
- 作用
- 定期从 被观察者(Obervable)需要发送的事件中 获取一定数量的事件 & 放到缓存区中,最终发送
- 原理
- 应用场景
- 缓存被观察者发送的事件
- 具体使用
- Buffer()每次是获取多少个事件放到缓存区
- Buffer()每次是获取多少个事件放到缓存区
- 作用
- ContactMap()
- 与FlatMap()的 区别在于:拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序
- 使用场景
- 有序的将被观察者发送的整个事件序列进行变换
- 原理
- 使用详解
- Map()
- 作用
- 对事件序列中的事件 / 整个事件序列 进行加工处理(即变换),使得其转变成不同的事件 / 整个事件序列
- 具体原理
- 类型
- 组合 / 合并操作符
- 作用
- 组合 多个被观察者(Observable) & 合并需要发送的事件
- 按发送顺序
- conact
- 使用场景 从内存,硬盘,网络获取获取网络缓存数据
- 使用到 conact 和 和 firstElement ()方法
- 参考:https://blog.****.net/carson_ho/article/details/78455449
- 使用场景 从内存,硬盘,网络获取获取网络缓存数据
- conact
- 按时间
- merge()
- mergeArray()
- 错误处理
- contactDelayError()
- mergeDelayError()
- 按发送顺序
- 合并多个事件
- 按数量
- zip()
- combineLatest()
- https://blog.****.net/carson_ho/article/details/78455624
- 需求场景:同时对多个事件进行联合判断 eg:比如使用用户登录的时候账号密码输入之后才能让登录按钮设置能点击状态
- mergeArray()
- 按时间
- 合并成一个事件发送
- 按数量
- 发送事件前追加发送事件
- startWith()
- startWithArrary()
- 统计发送事件的数量
- count()
- 组合 多个被观察者(Observable) & 合并需要发送的事件
- 实际开发使用
- 从磁盘 / 内存缓存中获取缓存数据
- 使用
- 使用
- 合并数据源
- zip
- Merge
- zip
- 联合判断
- 从磁盘 / 内存缓存中获取缓存数据
- 作用
- 功能性操作符
- 作用
- 辅助被观察者(Observable) 在发送事件时实现一些功能性需求
- 类型
- 连接(订阅)观察者 和 被观察者
- 订阅
- subscribe()
- 线程调度(切换)
- subscribeOn()
- observeOn()
- 错误处理
遇到错误时的机制- retryWhen()
- retryUntil
- retry() 作用:重试,即当出现错误时,让被观察者(Observable)重新发射数据
- onExceptionResumeNext()
- 遇到错误时,发送1个新的Observable
- onResumeNext()
- onErrorReturn()
- 遇到错误时,发送1个特殊事件 & 正常终止
- onErrorResumeNext()
- 遇到错误时,发送1个新的Observable
- 事件生命周期操作
- 生命周期中调用
- do()
- do()
- 延时操作
- delay()
- 重复发送操作
- repeat()
- repeatWhen()
- 连接(订阅)观察者 和 被观察者
- 实际开发使用
- 线程控制(切换/调度)
- 作用
- 指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型。
- 为什么要这么用进行线程调度?
- 因为创建的被观察者(Observerable) / 观察者(Observer) 的线程 = 主线程
- 所以生产事件 / 接收& 响应事件都发生在主线程
- 测试代码
- 作用
- 网络请求出错重连
- 使用
- 使用
- 轮询
- 线程控制(切换/调度)
- 作用
- 过滤操作符
- 作用
- 过滤 / 筛选 被观察者(Observable)发送的事件 & 观察者 (Observer)接收的事件
- 类型
- 根据 指定条件 过滤事件
- Filter()
- ofType()
- skip()
- skipLast()
- distinct()
- 过滤事件序列中重复的事件 / 连续重复的事件
- disinctUntilChanged()
- 根据 指定事件数量 过滤事件
- take()
- takeLast()
- 根据 指定时间 过滤事件
- throttleFirst()
- throttleLast()
- Sample()
- throttleWithTimeout()
- deboune()
- 根据 指定事件位置 过滤事件
- firstElement()
- lastElement()
- elementAt()
- elementAtOnError()
- 实际开发应用
- 功能抖动
- 无论点击多少次,也只会响应一次点击事件
- 无论点击多少次,也只会响应一次点击事件
- 联系搜索优
- 功能抖动
- 根据 指定条件 过滤事件
- 作用
- 条件 / 布尔操作符
- 作用
- 通过设置函数,判断被观察者(Observable)发送的事件是否符合条件
- 类型
- all()
- exits()
- contains()
- isEmpty()
- amb()
- takeWhile()
- takeUntil()
- skipUntil()
- defaultEmpty()
- sequenceEqual()
- 作用
- 创建操作符
- 背压策阅
- 原理
- 1. 避免出现事件发送 & 接收不匹配的情况
- 控制观察者接收发送事件的速度
- 面向对象
- 观察者
- 原理
- 相应式拉取
- 观察者根据自身情况按需接收事件
- 面向对象
- 控制被观察者接收事件的速度
- 面向对象
- 被观察者
- 原理
- 反馈控制
- 被观察者根据观察者接收事件能力大小,从而控制发送事件概率
- 面向对象
- 控制观察者接收发送事件的速度
- 2. 当出现发送事件 & 接收事件速率不匹配时的解决方案
- 采用背压策阅
- 面向对象
- 缓存区
- 原理
- 对超出缓存区大小的事件抛错,保留,报错等措施
- 面向对象
- 采用背压策阅
- 1. 避免出现事件发送 & 接收不匹配的情况
- 具体实现(Flowable)
- 对应的观察者变为 Scheduler
- 所有操作符强制支持背压
- 缓存区存放策阅
- 默认缓存区大小 128
- 需要解决的问题
- 流速不匹配, 即发送事件的速度 > 接收事件的速度
- 具体表现, 当缓存区存满(默认缓存区 = 128), 被观察者继续发送下一个事件时
- 解决方案
- 手动创建Flowable
- BackpressureStragy.Error
- 直接抛出 MissingBackpresureException
- BackpressureStragy.Missing
- 友好提示: 缓存区满了
- BackpressureStragy.Buffer
- 将缓存区设置成无限大
- 即被观察者 可 无限发送事件 观察者
- 实际上是存放缓存区
- 注意内存情况,防止出现 OOM
- BackpressureStragy.DROP
- 超过缓存大小(128)的事件丢弃
- BackpressureStragy.Latest
- 只保存最新(最后)的事件,超过缓存区的事件(128)的事件丢弃
- BackpressureStragy.Error
- 自动创建Flowable
- onBackPressureBuffer()
- onBackPressureDrop()
- onBackPressureLastest()
- 默认采用 BackpressureStragy.Error 模式
- 作用同模式参数
- 手动创建Flowable
- 原理
- 与常用的开源库结合使用
- Retrofit
- RxBinding
- RxBus
- 原理
- 被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer)
- 被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer)
- 注意事项
- 可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
- 可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
- 使用方式
- 源码解析
- 参考