Rxjs 总结(一)
操作符汇总
在Rx的世界发生的一切现象都是围绕着数据流展开的,数据流以Observable对象的的形式存在,而对Rx的应用实际就是使用操作符对数据流进行操作。按照使用方式,大体上可以将操作符划分为四类,分别是创建类,合并类,过滤类,转化类操作符。
创建类操作符
顾名思义,创建类数据流的产出是一个Observable对象。
一. 创建同步数据流
create :create的本质是直接调用Observable的构造函数。
1 | Observable.create = function(subscribe) { |
of : 利用of操作符可以轻松产生包含给定数据集合的Observable对象。
1 | Rx.Observable.of(1,2,3).subscribe(console.log); |
range : 返回一定范围上,以第一个参数为起点,持续加一作为数据的Observable对象
1 | Rx.Observable.range(1,100).subscribe(console.log); |
generate : 类似于for循环,包含条件判断,递增值设置,返回结果规则,创建一组Observable。
1 | Rx.Observable.generate(2,v <= 10, v => v+2, v => v**2).subscribe(console.log) |
repeat : repeat操作符是一个实例操作符,能够复制上游的Observable中的数据若干次,使用repeat操作符实际上是将Observable订阅了若干次 。
1 | Rx.Observable.of(1,2,3).repeat(3).subscribe(console.log); |
empty : 产生一个空的直接完结的Observable对象。
throw : 产生一个直接抛出错误的Observable对象。
1 | Rx.Observable.throw(new Error('Test')).subscribe(console.log, console.log, console.log('complete')) |
never : 产生一个既不抛错也不完结,也没有什么动作,就那么呆着的一个Observable。。。。。
1 | Rx.Observable.generate(2,v => v <= 10, v => v+2, v => v**2).subscribe(console.log) |
二. 创建异步数据流
interval : 创建出一个根据设置时间递增的Observable对象,interval不会主动停止,需要下游调用complete才会终止。
1 | Rx.Observable.interval(1000).subscribe(console.log) |
timer : (时间/毫秒数, ?时间间隔)=> 若没有第二个参数,timer会在指定时间返回0,若有第二个参数则以第一个参数为起点,第二个参数为时间间隔,持续产生一个递增的Observable对象。
1 | Rx.Observable.timer(Date.now() + 1000).subscribe(console.log) |
三. 把其他类型的数据转化为Observable
from : 可以把任何js对象转化为Observable对象。
1 | Rx.Observable.from([1,2,3]).subscribe(console.log); |
fromPromise : 将promise类型的对象转化为Observable 对象;
1 | Rx.Observable.fromPromise(Promise.resolve("test")).map(v => `hello, ${v}`).subscribe(console.log) |
fromEvent : 将DOM事件,或nodejs的event对象转化为Observable;
1 | Rx.Observable.fromEvent(document.getElementById('app'), 'click').subscribe(console.log); |
四. 其他
repeatWhen : 相比于repeat可控的重复订阅上游。
1 | Rx.Observable.of(1,2,3).repeatWhen((notifier$) => notifier$.delay(1000)).subscribe(console.log) |
defer : 延迟Observable创建,仅在被订阅的时候创建。
1 | const fetchData = () => Observable.ajax(url); |
合并类操作符
随着需求的复杂化,单一的操作符很难满足需求,因此操作符之间的组合让Rx发挥更大的威力。
功能 | 操作符 |
---|---|
把多个数据流以先到先得的方式合并 | merge, mergeAll |
把多个数据流首尾相接的方式合并 | concat,concatAll |
把多个数据流以一一对应的方式合并 | zip,zipAll |
持续合并多个数据流中最新产生的数据 | combineLatest, combineAll, withLatestFrom |
从多个数据流中选取第一个产生内容的数据流 | race |
在数据流前面添加一个指定数据 | startWith |
只获取多个数据流最后产生的那个数据 | forkJoin |
从高阶数据流中切换数据源 | switch, exhaust |
concat : 首尾相接
1 | Rx.Observable.of(1,2,3).concat(Observable.of(4,5,6)).subscribe(console.log) |
merge : 先到先得
1 | const mouseUp$ = fromEvent(document, 'mouseup'); |
zip : 一一对应
1 | //zip 会将上游数据转化为数组形式 |
combineLatest : 合并最后一个数据
1 | const run1$ = timer(500, 1000); |
withLatestForm : 仅根据上游的单一数据源向下游推送数据
1 | const timer1$ = timer(0, 1000); |
race :类似Promise.race,仅处理最先到达的Observable
1 | const timer1$ = timer(0, 1000); |
startWith : 在被订阅时先突出一定的数据
1 | const timer1$ = timer(0, 1000).startWith('start'); |
forkJoin : 等待所有Observable对象处理完结,选取每个最后一个对象传递给下游。
1 | const timer1$ = timer(0, 1000).take(1); |
switch: 总是切换到最新的内部Observable对象获取数据;
1 | const stream$ = Observable.interval(1000).take(2).map(v => Observable.interval(1500).map(v => `${x},${y}`).take(2)); |
exhaust : 耗尽当前Observable之前不会切换到下一个;
1 | const stream$ = Observable.interval(1000).take(2).map(v => Observable.interval(1500).map(v => `${x},${y}`).take(2)); |
辅助操作符
功能 | 操作符 |
---|---|
统计数据流中所有数据的个数 | count |
获得数据流中最大和最小的数据 | Max, min |
对数据流传输数据进行规约操作 | reduce |
判断是否所有数据满足判断条件 | every |
找到的一个满足判定条件的数据 | Find, findIndex |
判断一个数据流是否不包含任何数据 | isEmpty |
如果一个数据流为空就默认产生一个指定数据 | defaultIfEmpty |
count : 统计上游Observable对象吐出的所有数据。
1 | Observable.timer(1000).concat(Observable.timer(100)).count(); |
max, min : 获得Observable对象的最大值和最小值。
1 | Observable.of(1,2,3).min((a,b) => a - b) |
reduce : 规约统计,类似于JavaScript的数组方法reduce。
1 | Rx.Observable.of(1,2,3).reduce((acc, curr) =>acc + curr).subscribe(console.log); |
every :类似数组方法every,当所有的Observable对象满足某个条件时,返回true,否则false;
1 | Rx.Observable.of(1,2,3).every((x) => x > 2) |
find,findIndex :类似数组方法;
1 | Rx.Observable.of(1,2,3).find((x) => x > 2).subscribe(console.log); |
isEmpty :
1 | Rx.Observable.create(observer => { |
defaultIfEmpty :
1 | Rx.Observable.create(observer => { |
过滤数据流
功能 | 操作符 |
---|---|
过滤掉不满足条件的操作符 | filter |
获得满足条件的第一个数据 | first |
获得满足判断条件的最后一个条件 | last |
从数据流中取出最先出现的若干年数据 | take |
从数据流中取出最后出现的若干数据 | takeLast |
从数据流中选取数据直到某种情况发生 | takeWhile,takeUntil |
从数据流中忽略最先出现的若干数据 | skip |
从数据流中忽略数据直到某种情况发生 | skip, skipUntil |
基于时间的数据流量筛选 | throttleTime, debounceTime, auditTime |
基于数据内容的数据流量筛选 | throttle,debounce,audit |
基于采样方式的数据流量筛选 | sample,sanpleTime |
删除重复的数据 | distinct |
删除重复的连续数据 | distinceUntilChanged, distinctUntilKyChanged |
忽略数据流中的所有数据 | ignoreElements |
只选取指定出现位置的数据 | elementAt |
判断是否只有一个数据满足判定条件 | single |
filter :类似数组的filter方法,过滤掉不满足条件的数据。
1 | Observable.of(1,2,3).filter(v => v%2 == 0).subscribe(console.log); |
first : 满足条件的第一个数据。
1 | Observable.of(1,2,3).first(v => v%3 == 0).subscribe(console.log); |
last : 与first相反。
1 | Observable.of(1,2,3).last(v => v%3 == 0).subscribe(console.log); |
take,takeLast,takeWhile, takeUntil : 从数据流中取出多个数据,或按照条件中取出多个数据。
1 | Observable.of(1,2,3,4).take(3).subscribe(console.log); |
**skip ** :跳过前n个
1 | Observable.of(1,2,3).skip(1).subscribe(console.log); |
skipWhile, skipUntil
1 | Observable.interval(1000).skipWhile(v => v%2 == 0).subscribe(console.log) |
throttleTime : 在限定时间范围内,从上游向下游传递数据的个数。
1 | Observable.interval(1000).throttleTime(2000).subscribe(console.log); |
debounceTime : 让传递给下游数据间隔不能够小雨给定时间。
1 | Observable.interval(1000).filter(v => v % 3 === 0).debounceTime(2000).subscribe(console.log); |
throttle : 根据duorationSelector触发的时机来传递上游的值。
1 | Observable.interval(1000).throttle(v => { |
debounce : 根据duorationSelector触发的时机来传递上游的值。
1 | Observable.interval(1000).debounce(v => { |
auditTime, audit : 与throttle类似但是传递的是最后一个数据。
1 | Observable.interval(1000).auditTime(3000).subscribe(console.log); |
sample, sampleTime: 根据规则在一个范围内取一个数据抛弃其他数据。
1 | Observable.interval(1000).sampleTime(3000).subscribe(console.log); |
distinct: 只返回从没有出现过的数据。
1 | Observable.of(0,1,2,3,3,4,5,5,6).distinct().subscribe(console.log); |
distinctUntilChanged, distinctUntilKeyChanged : 删除掉和上一个相同的数据。
1 | Observable.of(0,1,2,3,3,4,5,5,6,1).distinctUntilChanged().subscribe(console.log); |
转化操作符
功能 | 操作符 |
---|---|
将每个元素映射函数产生新的数据 | map |
将数据流中每个元素映射为同一数据 | mapTo |
提取数据流中每个数据的某个字段 | pluck |
产生高阶ObservabTime,bule对象 | windowTime, windowCount,windowWhen,windowToggle,window |
产生数组构成的数据流 | bufferTime,bufferCount,bufferWhen,bufferToggle,buffer |
映射产生高阶Observable对象并合并 | concatMap, mergeMap, switchMap, exhaustMap |
产生规约运算结果组成的数据流 | scan, mergeScan |
map
1 | const func = function(value) { |
pluck : 将上游的数据按字段取出。
1 | Observable.of({name: 'mark'}).pluck('name').subscribe(console.log); |
windowTime, bufferTime
1 | Observable.interval(1000).windowTime(2000).subscribe(v => v.subscribe(console.log)); |
window, buffer
1 | const source$ = Observable.timer(0, 100); |
concatMap, mergeMap, switchMap, exhaustMap
1 | Observable.of(1,2,3).concatMap(v => Observable.interval(100).take(3)).subscribe(console.log); |
groupBy
1 | Observable.of(1,2,3,4).groupBy(v => v %2 === 0).subscribe((v) => v.subscribe(console.log)); |
scan : 可以理解为可以持续传递数据的reduce
mergeScan : 返回Observable的scan
异常处理
功能 | 操作符 |
---|---|
捕获上游的错误 | catch |
当上游产生错误时重试 | retry,retryWhen |
无论是否出错都要进行一些操作 | finally |
catch
1 | Observable.of(1,2,3,4).map((v) => { |
retry, retryWhen
1 | Observable.of(1,2,3,4).map((v) => { |
finally : 同Javascript的finally方法。
多播
功能 | 操作符 |
---|---|
灵活选取Subject对象进行多播 | multicast |
只多播数据流中最后一个数据 | publishLast |
对数据流中给定数量的数据进行多播 | publishReplay |
拥有默认数据的多播 | publishBehavior |
Cold Observable: 每次subscribe都产生一个全新的数据序列的数据流。
Hot Observable: 真正的数据源与Observer没有关系,达到统一数据源的效果(fromEvent, fromPromise)。
1 | const subject = new Subject(); |
subject可以有多个数据源,起到的作用就是把多个数据源的内容汇总到一个Observable中去。
multicast
1 | const tick$ = Observable.interval(1000).take(10); |
publish
1 | const publish = (selector) => { |
share
1 | const share = () { |
publishLast,AsyncSubject
1 | function publishLast() { |
publishReplay, replaySubject
1 | function publishReplay(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY) { |
publishBehavior, BehaviorSubject
1 | function publishBehavior(value) { |
Scheduler
Scheduler 可以作为创造类和合并类操作符的函数使用,Rx还提供了observeOn和subscribeOn两个操作符,用于在管道任何位置插入Scheduler
1 | console.log('before'); |