RxJS 博大精深,看了好几篇文章都没有明白.
范围牵扯到了函数响应式开发去了… 函数式, 响应式…
唉…不过日子还是得过…混着过先呗
我目前所理解的很浅, 大致上是这样的概念.
1.某些场景下它比promise好用, 它善于过滤掉不关心的东西.
2.它是观察者模式 + 迭代器模式组成的
3.跟时间,事件, 变量有密切关系
4.世界上有一种东西叫 “流” stream, 一个流能表示了一段时间里,一样东西发生的变化.
比如有一个值, 它在某段时间里从 “我” 变成 “你” 再变成 “他”.
而我们可以对这个流进行观察,所以只要它发生变化,我们就会发现然后做任何事情。
5.站在游览器的角度, 服务器推送数据过来, 用户操作界面, timer 都是我们关心的流.
好,来看例子.
我们通过 new Subject 来创建流. 也可以使用 new EventEmitter 或者 BehaviorSubject. 这些都继承了 Subject
EventEmitter 是 ng2 提供的
BehaviorSubject 可以填入初始值
1 2 |
import { Subject } from "rxjs/Subject"; private textEmitter: Subject<string> = new Subject(); |
要改变流中的值,我们使用 .next(value), 这个是迭代器的概念咯
1 2 3 4 |
keyup(value : string) { this.textEmitter.next(value); } |
那么订阅是这样的
1 2 3 4 5 6 7 8 9 10 |
ngOnInit() { this.text$ = this.textEmitter .debounceTime(500) .distinctUntilChanged() .switchMap(v => this.getDataAsync(v)); this.text$.subscribe((value) => { console.log(value); }); } |
input keyup 性能优化, 我们通常会写一个 timeout + cleartimeout 的方式, 这个 debounceTime 就是干这个的
流更新结束后 500ms 才会通知观察者
distinctUntilChanged 是说只有当值和上一次通知时的值不一样的时候才通知观察者
.map 和 .switchMap 都是用来对值进行处理的, 这个和 array.map 概念是一样的
而 .map 和 .switchMap 的区别是 .swichMap 处理那些返回 Observeable 的值
1 2 3 4 5 6 7 8 9 |
getDataAsync(value : string): Observable<string> { let subject = new Subject(); setTimeout(() => { console.log("after 2second"); subject.next(value + "final"); }, 2000); return subject; } |
如果我们使用 map 的话,它会直接返回 “subject” 这个对象, 而如果用 switchMap 它会返回这个 subject 对象的响应值.
1 2 |
<input type="text" #input (keyup)="keyup(input.value)" /> <p>{{ text$ | async }}</p> |
ng2 提供了一个 async Pipe, 它会监听左边这个 text$ stream. 后面加一个 $ 符号通常用来表明这是一个 stream.
还有一个常用的功能是 combineLatest
就是可以同时监听多个流,只要其中一个有变动,那么所有的最新值都会发布出去, 可以用来实现依赖属性.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@Component({ selector: "compute-property", template: ` <input type="text" #input1 (keyup)="text1.next(input1.value)" /> <input type="text" #input2 (keyup)="text2.next(input2.value)" /> {{ result$ | async }} ` }) export class ComputePropertyComponent implements OnInit { text1: BehaviorSubject<string> = new BehaviorSubject<string>("a"); text2: BehaviorSubject<string> = new BehaviorSubject<string>("b"); result$: Observable<string>; constructor() {} ngOnInit() { this.result$ = Observable.combineLatest(this.text1, this.text2).map(values => { return values[0] + " " + values[1]; }); } } |
还有 bufferCount, bufferTime 也是常用到
1 2 3 4 5 6 7 8 9 |
text: Subject<number> = new Subject<number>(); ngOnInit() { this.text.bufferCount(2) .subscribe(v => console.log(v)); //[v1,v2] 存够 count 了就发布 this.text.bufferTime(2000) .subscribe(v => console.log(v)); //[v1,v2,...]把 2 秒内的所有 next value 放进来 } |
Observable.of 可以简单的返回一个默认值
1 |
Observable.of<string>("").subscribe(v => console.log(v)); |
rxjs 整个文档非常大,要按需加载.
通常做法是为项目开一个 rxjs-operators.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import 'rxjs/add/observable/throw'; import 'rxjs/add/observable/combineLatest'; import 'rxjs/add/observable/from'; import 'rxjs/add/observable/of'; import 'rxjs/add/operator/catch'; import 'rxjs/add/operator/debounceTime'; import 'rxjs/add/operator/distinctUntilChanged'; import 'rxjs/add/operator/map'; import 'rxjs/add/operator/switchMap'; import 'rxjs/add/operator/toPromise'; import 'rxjs/add/operator/startWith'; import 'rxjs/add/operator/bufferCount'; import 'rxjs/add/operator/bufferTime'; |
放入常用的方法
然后再 app.module.ts 里面导入它
1 |
import './rxjs-operators'; |
默认情况下,observable是 not share
1 2 3 4 5 6 7 |
let sub = new Subject(); let obs = sub.map(v => { console.log("ajax call"); }); obs.subscribe(v => console.log("subscribe 1")); obs.subscribe(v => console.log("subscribe 2")); sub.next("value"); |
ajax发了两次,angular2的Http也是not share。
所以当我们有多个 subscribe 的时候要想一想是否我们需要 share
1 2 3 |
let obs = sub.map(v => { console.log("ajax call"); }).share(); |
调用一个 share 方法就可以了,或者是
1 2 3 |
let obs = sub.map(v => { console.log("ajax call"); }).publish().refCount(); |
效果是一样的.
默认情况下,observable 是cold。
意思是说只有在 subscribe 出现了以后才会启动. ( 当第一个 subscribe 出现时, observable 就会立刻启动了哦 )
1 2 3 4 5 6 7 |
let sub = new Subject(); let obs = sub.map(v => { console.log("ajax call"); }); sub.next("aaa"); //obs.subscribe(v => console.log("subscribe 1")); //obs.subscribe(v => console.log("subscribe 2")); |
ajax 不会触发.
如果我们希望它在没有 subscribe 的情况下触发的话, 可以这样写.
1 2 3 4 5 6 |
let sub = new Subject(); let obs = sub.map(v => { console.log("ajax call"); }).publish(); obs.connect(); sub.next("aaa"); |
至于什么情况下使用哪一种,我还没有实战,以后再说.
多一个例子解释:
1 2 3 4 5 6 7 8 9 10 |
let obs = Observable.create(observer => { console.log("observer run"); observer.next(Date.now()); }); obs.subscribe(v => console.log("1st subscriber: " + v)); obs.subscribe(v => console.log("2nd subscriber: " + v)); //observer run //1st subscriber: 1474649902498 //observer run //2nd subscriber: 1474649902501 |
no share. 所以 observer run 了 2 次.
1 2 3 4 5 6 7 8 |
let obs = Observable.create(observer => { console.log("observer run"); observer.next(Date.now()); }).share(); obs.subscribe(v => console.log("1st subscriber: " + v)); obs.subscribe(v => console.log("2nd subscriber: " + v)); //observer run //1st subscriber: 1474650049833 |
share 了, 所以 observer only run 1 次.
cold, 所以当第一个 subcribe 出现后 observer 立刻运行 -> .next 更新了 value -> 第一个 subcribe callback 被调用 -> 整个过程结束 -> 然后第2个 subcribe 注册 .. 由于是 share 所以 observer 没有载被触发. 第2个 subscribe callback 没有被调用.
延后触发的做法 :
1 2 3 4 5 6 7 8 9 10 |
let obs = Observable.create(observer => { console.log("observer run"); observer.next(Date.now()); }).publish(); obs.subscribe(v => console.log("1st subscriber: " + v)); obs.subscribe(v => console.log("2nd subscriber: " + v)); obs.connect(); //observer run //1st subscriber: 1474650370505 //2nd subscriber: 1474650370505 |
可以看到 .publish() 之后, subscribe 不再能激活 observer 了,而必须手动调用 .connect() 才能激活 observer.
这几个例子只是为了让你了解它们的玩法.
小结:
observer default is cold and not share.
cold 表示只有 subscribe 出现 observer 才会被激活.
not share 表示每一个 subscribe 都会激活 observer 链.
常用 :
1. finally 的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import 'rxjs/add/operator/finally'; this.http.get( "http://localhost:58186/api/products", { headers: new Headers({ "Accept": "application/json" })} ).finally(() => { console.log("finally"); //不管 success or error 最后都会跑这个 }).subscribe(response => { console.log("success"); }, response => { console.log("fail"); }, () => { console.log("success final"); }); //result : //success -> success final -> finally //fail -> finally |
2. 错误处理 throw catch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import 'rxjs/add/operator/catch'; import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/throw'; this.http.get( "http://localhost:58186/api/products", { headers: new Headers({ "Accept": "application/json" }) }) .map(r => r.json()) .catch((r) => { if ("1" == "1") { //do something ... return null; //catch 了在返回真确 } else { return Observable.throw("error"); //catch 了继续返回错误 } }) .subscribe( r => console.log(r), r => { console.log("fail") } ); |
3. previous / current value
用 .pairwise()
1 2 3 4 5 6 7 8 |
let userSubject = new BehaviorSubject<string>("default value"); let user$ = userSubject.asObservable().pairwise(); user$.subscribe(([before, after]) => { console.log(before), console.log(after); }); userSubject.next("super"); userSubject.next("ttc"); //result : //["default value","super"] //["super","ttc"] |