rxjs中的自定义pipe用法

这些天在写一些功能的时候,希望可以把一些方法改写为函数式调用的链式写法,但是这些操作很多是基于Promise的异步调用,后来发现可以基于rxjs的自定义pipe很好的解决这个问题 ,既可以保障函数调用足够简单,不产生回调地狱类似的问题,由可以很好的生成异步任务队列。

有这样一组任务:

1
2
3
任务A 
任务B
任务C

任务A,B,C 之间是完全异步的,但是需要按顺序执行,这是当然可以用await来完成,但还有时需要数据在A,B,C之间进行传递,可能写成这样:

1
2
3
await 任务A(result)
await 任务B(result)
await 任务C(result)

这是可以通过一个result对象来存储返回的结果,但是如果在某个任务失败的情况下来终止任务就需要添加throw操作,当然我希望可以写成这样:

1
2
3
4
x
.任务A()
.任务B()
.任务C()

但是在保障任务顺序执行和终止上需要还多额外的工作,后来想到了rxjs的pipe:

1
2
3
4
5
obserable.pipe(
任务A(),
任务B()
任务C()
)

这样的调用方式刚好满足我的需要,下载只需要编写对应的自定义pipe即可。

自定应pipe需要返回一个函数<T>(source: Observable<V>) => Observable<V>

这个函数中传入的Observable用来传入pipe目标observable中的数据,返回的Observable用来控制pipe的状态,来控制pipe是否继续向下执行,如任务B发生异常是执行error方法,就会终止后续任务的执行,这样满足了我们在pipe进行异步操作的需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public call(handler: (arg) => Promise<any>) {
return <T>(source: Observable<any>) =>
new Observable<any>(observer => {
return source.subscribe({
next: async (data) => {
handler(data)
.then(() => {
observer.next(tx)
})
.catch(err => {
observer.error(err)
})
.finally(() => loading.dismiss())
},
error(err) {
observer.error(err)
},
complete() {
observer.complete()
}
})
})
}

传入的handler是一个promise操作,这样就可以把一个Promise方法封装到这个pipe中。

为了可以进行调用,我们需要在管道外加一层

1
2
3
4
5
public transaction(result): Observable<Transaction> {
return Observable.create(observer => {
observer.next(result)
})
}

result可以作为数据结果共享的载体,这样多个自定义pipe之间就通过result共享数据。

调用时就可以像这样方便的调用了:

1
2
3
4
transaction(result).pipe(
this.call(...promise操作A),
this.call(...promise操作B)
)