javascript代码实例教程-RxJS入门(3)----深入Sequence

发布时间:2019-01-15 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了javascript代码实例教程-RxJS入门(3)----深入Sequence脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
小宝典致力于为广大程序猿(媛)提供高品质的代码服务,请大家多多光顾小站,小宝典在此谢过。

在之前的系列中,我吧sequence翻译成了序列,题目我就不翻译了,感觉翻译过来的有点失去了点啥。其他好多地方都是用stream(流)来比喻和形容。

可视化Observable

RxJS编程中你已经学了一些关于使用最频繁的操作符了。讨论什么是sequence的操作符感觉有点抽象。为了帮助开发者更容易的理解操作符,我们使用Marble diagrams(弹子图?翻译估计有问题)来标准可视化。他们很形象的代表了异步的数据流,并且你可以在其他的rxjs资料里面找到。

让我们看下range操作符,它返回一个发射特定范围内整数的Observable:如下图所示:

 

javascript代码实例教程-RxJS入门(3)----深入Sequence

 

这个长箭头代表着这个Observable,x轴代表者时间,每一个圈代表着内部调用OnNext()时Observable发射的值。在产生第三个值之后,range调用了上图中用垂直线表示的onCompleted函数。

让我们看下包含若干Observable的例子。merge操作符操作两个不同的Observable,并且返回合并值的新的Observable。

interval操作符返回一个在规定的毫秒间隔时间内产生增加数的Observable。 在下面的例子中,我们将会合并两个使用了interval的不同的Observable,这两个Observable都是在不同的时间间隔而产生值:

VAR a = Rx.Observable.interval(200).map(function(i) {

return 'A' + i;

});

var b = Rx.Observable.interval(100).map(function(i) {

return 'B' + i;

});

Rx.Observable.merge(a, b).subscribe(function(x) {

console.LOG(x);

});

结果如下:

》B0, A0, B1, B2, A1, B3, B4…

merge操作符的示意图如下:

 

javascript代码实例教程-RxJS入门(3)----深入Sequence

 

这里,沿着y轴虚线箭头指向了A和B序列中的每个元素变化之后的最终结果。C序列代表着作为结果的Observable,它包含了A和B序列合并后的元素。如果不同的序列的元素同时发射,合并序列的元素顺序是随机的。

基础的序列(sequence)操作符

几乎在Rxjs转变Observable的数十个操作符里,也有在任何一门语言里使用最多的有收集-处理能力的如:map、filter、reduce。在JavaScrpIT中,你可以在数组的实例中找到这些操作符(函数)。 Rxjs遵循着JavaScript的约定,你有机会可以找到几乎和数函数作一样的操作符,实际上,我们将会展示数组和Observable的真实使用,用以呈现这两者的API是有多么的相似。 map

map是用的最多的序列转化操作符。它需要一个Observable和一个函数,并且把函数应用于Observable的每一个值。它返回一个新的转化后值的Observable。

 

javascript代码实例教程-RxJS入门(3)----深入Sequence

 

在上面两种情况,src都是不会改变的。

这个代码,和下面的代码,使用logValue定义:

var logValue = function(val){console.log(val)};

他可能是我们传递给map去做一些异步计算改变那个值的函数。在某些状况下,map可能不会如期盼的那样工作,对于这些状况,更好的办法是使用flatMap操作符。

filter

filter需要一个Observable和一个函数,并且它会使用这个函数去测试Observable中的每一个元素。它将会返回一个序列,这个序列中所的元素都是那个函数返回true的值。

 

javascript代码实例教程-RxJS入门(3)----深入Sequence

 

reduce

reduce(也做fold)需要一个Observable并且返回一个新的只包含一个单个item的Observable,这个单个的item是某个函数应用到每一元素的的结果。这个函数接受当前元素和这个函数上一次调用的结果。

 

javascript代码实例教程-RxJS入门(3)----深入Sequence

 

reduce是一个处理某个序列很强大的操作符。实际上它是一种被称为聚合操作符(aggregate operators)的一个完整子集的基础实现。 Aggregate OPErators 聚合操作符处理某个序列并且返回单个的值。例如:Rx.Observable.First需要一个Observable和一个可选的断言函数并且返回第一个满足断言函数的元素。 计算某个序列的平均值也是一个聚合操作中。Rxjs提供了average操作的实例,但是有这个选着的缘故,我们使用reduce来实现下。每一个聚合操作都能被仅仅使用reduce来实现。

var avg = Rx.Observable.range(0, 5) .reduce(function(prev, cur) { return { sum: PRev.sum + cur, count: prev.count + 1 }; }, { sum: 0, count: 0 }) .map(function(o) { return o.sum / o.count; }); var subscription = avg.subscribe(function(x) { console.log('Average is: ', x); });

》Average is: 2

面代码的解释,我就不翻译了,直接看那就能看懂。设想下,如若现在我们要计算一个步行者的平均速度,用reduce很好实现,但是假设,在时间轴上,步行者永远走下去,那么像reduce一样的聚合操作符将永远不会调用它的观察者(obeserver)的onNext函数。 很高兴的是,Rxjs团队已经想到这种情况了,并且给我们提供了一个scan操作符,它扮演着像reduce的角色,但是它会发送每一个中间的结果。代码如下:

var avg = Rx.Observable.interval(1000)

.scan(function (prev, cur) {

return {

sum: prev.sum + cur,

count: prev.count + 1

};

}, { sum: 0, count: 0 })

.map(function(o) {

return o.sum / o.count;

});

var subscription = avg.subscribe( function (x) {

console.log(x);

});

使用如上方式,我们就可以聚合某个消耗时间长或者没有时间限制的序列了。在上面的代码中,我们每秒产生了一个增量的整数,并且用scan取代了先前的reduce。我们就以每秒为间隔取得了到目前为止所有的平均值。

- flatMap

- 如果你又一个Observable它的结果是许多嵌套的Observable将则怎么做?大部分的时间,你想到的就是统一这些嵌套的Observable的元素到一个单个的序列中。这也是flatMap所要明确干的事。

- flatMap操作符需要一个Observable参数,这个参数的元素也是Observable,并且返回只有一个孩子Observable的平坦值的Observable。

 

@H_360_153@

 

我们可以看到到每一在A(A1,A2,A3)中的元素也是Observable序列,一旦我们对A应用flatMap转化功能,我们将获得一个Observable,它包含所A所有不同孩子的所有元素。

flatMap是一个强大的操作符,但是它比起我们目前学到的其他操作符都要难理解些,把它认为是这些Observable的concatAll()函数。 contcatAll是需要一个数组的数组函数,并且返回一个平坦的单一的数组,这个数组包含所有子数组的值,而不是这些子数组本身。我们可以使用reduce去建一个这样的函数:

function concatAll(source) {

return source.reduce(function(a, b) {

return a.concat(b);

});

}

我们可以这样使用它:

concatAll([[0, 1, 2], [3, 4, 5], [6, 7, 8]]);

// [0, 1, 2, 3, 4, 5, 6, 7, 8]

flatMap做同样的事,但不是平坦的arrays而是observable…… Canceling Sequences 在Rxjs中,我们可以取消正在运行的Observable。相比起回调函数或者promise(有部分promise实现支持取消)这些异步交互的方式,这就是优势。 有隐式或者是明确的这两者主要的取消Observable的方式。

1)明确的取消:dispose

Observable本身并没有取消方法。当我们订阅到一个Observable的时候我么就是获得了一个Disposable对象,这个dispose对象代表这个特殊的订阅。这样我们就可以在这个对象中调用dispose方法了,之后这个特殊的订阅就会停止从Observable中接受通知。

在接下来的例子中,我们订阅了两个counter的Observable,counter每秒发射一个自增的整数。两秒后我们取消第二个订阅(subscription),之后我们看到它的输出停止了,但是第一个订阅者的输出还是在进行的。

var counter = Rx.Observable.interval(1000);

var subscription1 = counter.subscribe(function(i) {

console.log('Subscription 1:', i);

});

var subscription2 = counter.subscribe(function(i) {

console.log('Subscription 2:', i);

});

setTimeout(function() {

console.log('Canceling subscription2!');

subscription2.dispose();

}, 2000);

结果将会如下:

》Subscription 1: 0

Subscription 2: 0

Subscription 1: 1

Subscription 2: 1

Canceling subscription2!

Subscription 1: 2

Subscription 1: 3

Subscription 1: 4

2)通过Operator的隐式取消

大部分的时候,操作符将会为你自动取消subscription。像range或者take,当序列结束或者满足某个操作的条件时,他们将会取消订阅。更高级的如withLastestFrom或者flatMapLastest等操作符,当它们动态创建Observable时,将会在需要订阅的时候内部自动地创建和销毁。

当我们使用Observable包装外部没有提供取消功能的API接口时,当取消Observable时,Observable将会停止发送通知,但是里面的API将不会被注销。例如你使用一个包装着promise的Observable,当取消时Observable会停止发射,但是里面的promise不会停止。

如下代码,我们试图取消关联到某个包装了promise Observable的订阅,同时,我们也把promise按照传统的方式了进行一个操作。这个promise将会在5秒后被执行,但是取消订阅会立马被执行。

var p = new Promise(function(resolve, reject) {

window.setTimeout(resolve, 5000);

});

p.then(function() {

console.log('Potential side effect!');

});

var subscription = Rx.Observable.fromPromise(p).subscribe(function(msg) {

console.log('Observable resolved!');

});

subscription.dispose();

5秒之后我们将会看到:

》Potential side effect!

如果我们取消到Observable的订阅,它将会很有效的停止重Observable接受通知。但是这个promise的then方法还会继续执行,这展示给我们:取消Observable并不会取消它内部的promise。 因此,在Observable内部使用外部的API接口必须知道里面的细节是很重要的。你可以想象:你已经取消了一个序列,但是它内部的一些api任然在运行,并且给你的程序带来一些副作用,这些错误真的很难被捕获。

Handling Errors

在回调函数中,我们之所以可以使用传统的try/catch机制,是因为它是同步的。由于它运行在任何异步代码之前,所以它将捕获不到任何错误。 在回调函数中的解决方式是:传递这个错误作为回调函数的一个参数,这样虽然可以起到作用,但是会使代码变得相当脆弱。 下面让我们看看Observable是如和捕获错误: The onError Handler 当我们说起observer时候,必须记住三个函数:onNext、oncompleted、onError。onError是Observable中有效处理错误的关键。 为了展示它如何工作,如下将会有一个简单的函数,它需要json串的数组并且返回一个Observable,这个Observable发送用JSON.parse转化那些json串后的对象。

function getJSON(arr) {

return Rx.Observable.from(arr).map(function(str) {

var parsedJSON = JSON.parse(str);

return parsedJSON;

});

}

通过getJSON我们将会传递三个json串,第二个串会包含一个语法错误,因此JSON.parse无法解析它。接着我们通过提供onNext和onError处理器来订阅那个结果:

getJSON([

'{"1": 1, "2": 2}',

'{"success: true}', // Invalid JSON string

'{"enabled": true}'

]).subscribe(

function(json) {

console.log('Parsed JSON: ', json);

},

function(err) {

console.log(err.message);

}

);

结果如下:

》Parsed JSON: { 1: 1, 2: 2 }

JSON.parse: unterminated string at line 1 column 8 of the JSON data

针对数组的第一个结果 Observable发射了一个JSON转译对象,但是第二个会抛出一个异常,onError处理器将会捕获这个异常,并打印它。默认的行为是,无论什么时候,只要异常一发生,Observable将会停止发射,并且onCompleted将不会被调用。 捕获异常 到目前为止我们已经知道如何侦测一个异常,并且去做些什么。但是我们还没没能响应我们接着要做的事。Observable实例提供了一个catch操作符,它允许我们对一个Observable的错误进行响应之后而继续进行其他Observable。 catch操作符需要一个以异常为入参的Observable或者函数,它返回另外一个Observable。在我们的例子中,由于在原始的Observable中有错误,我们想Observable发射一个包含异常属性的JSON对象。

function getJSON(arr) {

return Rx.Observable.from(arr).map(function(str) {

var parsedJSON = JSON.parse(str);

return parsedJSON;

});

}

var caught = getJSON(['{"1": 1, "2": 2}', '{"1: 1}']).catch(

Rx.Observable.return({

error: 'There was an error parsing JSON'

})

);

caught.subscribe(

function(json) {

console.log('Parsed JSON: ', json);

},

// Because we catch errors now, `onError` will not be executed

function(e) {

console.log('ERROR', e.message);

}

);

上述代码中,我们创建了一个叫做caught的新的Observable,它使用catch操作符捕获初始Observable里的异常。如果有异常,它将继续这个序列通过使用仅仅发射一个异常属性item的Observable来描述这个错误,结果如下:

》Parsed JSON: Object { 1: 1, 2: 2 }

Parsed JSON: Object { error: “There was an error parsing JSON” } 下面就是catch操作符的marble diagram(弹子图):

 

javascript代码实例教程-RxJS入门(3)----深入Sequence

注意到“X”在序列上的,它代表着一个异常(错误)。不同的Observable值的形状:三角形代表着这些值来自另外一个Observable,在这里,那个Observable是我们异常情况下返回的Observable。 catch对序列中的异常的交互来说非常有用,并且它的好多行为跟传统的try-catch块是很相似的。在好多情况下,忽略序列中某个项发生的异常并且让这个序列继续下去是很方便的。在这些情况下,我们可以使用retry操作符。 Retrying Sequences 有些时候仅仅是发生错误而不需要我们去做些什么。例如,一个由于用户零星internet链接或者远程服务器宕机而导致的远程数据请求的超时,在这种情况下,这将会是一个很好的办法,如果我们一直请求我们需要的数据直到成功为止。那个retry做了如下操作:

 

//This will try to retrieve the remote URL up to 5 times.

Rx.DOM.get('/products').retry(5)

.subscribe(

function(xhr) { console.log(xhr); },

function(err) { console.error('ERROR: ', err); }

);

在上面的代码中,我们创建了一个函数,它返回了一个使用XMLHttpRequest向一个url重复获取内容的Observable。由于我们的链接有可能会不靠谱,我们在订阅之前增加了retry(5),确保在异常的情况下,在Observable挂起和报错之前会尝试5次。 当我们使用retry的时候,有两件事很重要: 1)如果我们不传任何参数,它将会无限次尝试,直到Observable结束并且没有异常。这对程序来说是非常危险的,一旦Observable一直报错的话。如果我们使用同步的多个Observable,它将会有同样无限循环的结果。 2)retry将会重新retry整个的Observable,即使某些项没有异常。因为每次retry,它都会重新运行,当我们处理的某些项的时候,将会导致意外的结果,这一点也很重要。

 

觉得可用,就经常来吧! 脚本宝典 欢迎评论哦! js脚本,巧夺天工,精雕玉琢。小宝典献丑了!

脚本宝典总结

以上是脚本宝典为你收集整理的javascript代码实例教程-RxJS入门(3)----深入Sequence全部内容,希望文章能够帮你解决javascript代码实例教程-RxJS入门(3)----深入Sequence所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。