RxJS – Part 4 – Operators
All posts in RxJS series:
RxJS Operators introduction
This time we are going to talk about operators. If you are not familiar with RxJS you should read first post in the series – introductory post.
There are many different ways to deal with asynchronous data. We could use callback functions or promises. Also, we could use generators and make our life easier by writing code that deals with asynchronous data in a synchronous manner. However, when we are using RxJS, operators are primary tool for manipulating and dealing with data. Consequently, main benefit of RxJS is various range of operators it supports.
Some of the operators transform and filter data, some of them change the timing of data, some of them let us create observables from nearly anything. Furthermore, there are operators that enable us to merge and combine data from multiple streams. Also, we have operators that help us deal with error handling. Especially relevant is that all of the operators are composable. That means we can combine operators with others to establish a nice and readable way to deal with asynchronous data.
Lets see some examples!
Array operators with Observable
Lets say we have a source of data – array of numbers [ 1, 2, 3 ]
. We will make an observable from it. Also, we want to do some manipulation on that observable.
Map, filter, and reduce all do the same logic as they would for an JavaScript array. We use map to make new set of data. Therefore, we multiply ever member of array with 2 – we get an array [ 2, 4, 6, 8 ]
. After that, we filter out the array and get only those numbers that are greater than 5. As a result we get an array: [ 6, 8 ]. Finally, we use reduce to combine values into one value. Reduces acts as a sum function.
Important distinction is that in JavaScript with map, filter and reduce we get a new array for every operation. However, in RxJS we get only one output after all actions have been applied to source.
Map and flatMap
Map in RxJS behaves like map
function in JavaScript:
The map() method creates a new array with the results of calling a provided function on every element in this array.
So JavaScript map()
takes an array and a function. It will create a new array. It will apply given function for every element in the array. Result of function applied against element in array gives us a new element. That element is then stored into new array. Finally, map
returns us a new array containing new items.
Lets see an example in RxJS:
And we get the following output:
We can map the input ( in our case number ) inside of map()
function not only to the same type but also to anything we want. We can map it to an object, boolean value, array, etc. Lets map our number to array:
Output is:
Quite easy. We multiplied every number with 1, 2, 3 and stored those values in an array and then we returned that. What if we wanted to get the numbers and not arrays. We need to flatten the array. We can do that with flatMap:
Now we get flattened result:
That is one use case where flatMap
can be useful.
Lets see another example that can be quite common when we are dealing with RxJS. That is the case when the function inside of map()
returns an Observable and not a number.
We will get the following output:
Now comes the flatMap
, it can help us transform this ScalarObservable into a regular value. We have to change only one line of the code:
And we get the following output:
Reduce and scan
So lets sum numbers from 0 to 3. We can simply use interval and reduce to achieve that.
We get the following output:
Notice that timer ends after 400 ms. That implies that reduce
waited for completion of observable. It waited 4 x 100 ms and after that we got the sum as the result.
Scan
can do the same thing as reduce
– go through every element and give us access to accumulated value and current item of collection.
However, using scan
and subscribing to it we can see it emits a value for us for every item. Meaning that subscriber will see results for every iteration.
And the output is now:
Clicks and buffers
Lets see how we can interact with a button click and add values to HTML div.
You can try out the example here – JS Bin.
Buffer takes an Observable as the parameter – opening. Every time Observable that was passed in buffer emits a value buffer
will redirect those values to forEach. In other words it will flush the buffer into forEach. What we want is to add values to HTML only after 1 second has passed since last click. We can achieve that by using throttleTime operator:
We are buffering all the clicks but when there is a delay of more than 1 second between clicks we flush the buffer and add the buffer result to HTML. After first click there will not be any values added to the HTML but after one second passes and we click second time we should see some values.
Let it flow!
Lets see how we can use one stream to toggle ( on/off ) another stream.
And HTML:
We have a source that keeps emitting values with help interval and scan operator. We make an observable from checkbox and its ‘change’ event. Whenever checkbox is checked or unchecked we checked
Observable will map that to the boolean value. Then we filter our isChecked
stream to filter only when a checkbox is checked ( when value from checked is true
). Then we use the flatMap
operator to flatten the Observable of Observables as we saw in previous example. Now we have the problem that once we check the checkbox we can not stop the stream from running.
We can solve our problem by using another handy operator – takeUntil
.
Rx.Observable.prototype.takeUntil(other)
This operator takes values from a source Observable until some other Observable sequence or Promise produces a value. Once a single value has been produced from another source it will stop taking values from its source.
We will simply call takeUntil
on our source inside of flatMap and we will pass it checked
source of stream. That means that once checked
is changed it will stop streaming our source.
Live sample can be found at JSBin.
Summary
In RxJS with help of operators we are doing things declaratively and code is much more self descriptive.
When we are using filter, map or reduce in JavaScript with arrays we are actually producing a new array every time. These arrays tend to be big and they need to be garbage collected and that can be a problem. With RxJS we don’t create new collections, we simple apply multiple operators on a source and output a result.
We saw how flatMap
can help us get the clean output in two different scenarios.
takeUntil(
) function is a convenient way of completing a sequence when another Event occurs. We’ve learned that Observable sequences are much more powerful than raw events, because they can complete. The take()
and takeUntil()
functions are powerful enough to ensure that we never have to unsubscribe from another event again! This reduces the risk of memory leaks and makes our code more readable.That is all for now. In next post we will talk about error handling and see how we can handle errors appropriately with RxJS. After that we will talk more about operators.
Also published on Medium.