RxJS – Part 4 – Operators
All posts in RxJS series:
RxJS Operators introduction
This time we are going to talk about operators. If you are not familiar with RxJS you should read first post in the series – introductory post.
There are many different ways to deal with asynchronous data. We could use callback functions or promises. Also, we could use generators and make our life easier by writing code that deals with asynchronous data in a synchronous manner. However, when we are using RxJS, operators are primary tool for manipulating and dealing with data. Consequently, main benefit of RxJS is various range of operators it supports.
Some of the operators transform and filter data, some of them change the timing of data, some of them let us create observables from nearly anything. Furthermore, there are operators that enable us to merge and combine data from multiple streams. Also, we have operators that help us deal with error handling. Especially relevant is that all of the operators are composable. That means we can combine operators with others to establish a nice and readable way to deal with asynchronous data.
Lets see some examples!
Array operators with Observable
Lets say we have a source of data – array of numbers
[ 1, 2, 3 ] . We will make an observable from it. Also, we want to do some manipulation on that observable.
[ 2, 4, 6, 8 ] . After that, we filter out the array and get only those numbers that are greater than 5. As a result we get an array: [ 6, 8 ]. Finally, we use reduce to combine values into one value. Reduces acts as a sum function.
Map and flatMap
Map in RxJS behaves like
The map() method creates a new array with the results of calling a provided function on every element in this array.
map() takes an array and a function. It will create a new array. It will apply given function for every element in the array. Result of function applied against element in array gives us a new element. That element is then stored into new array. Finally,
map returns us a new array containing new items.
Lets see an example in RxJS:
And we get the following output:
We can map the input ( in our case number ) inside of
map() function not only to the same type but also to anything we want. We can map it to an object, boolean value, array, etc. Lets map our number to array:
Quite easy. We multiplied every number with 1, 2, 3 and stored those values in an array and then we returned that. What if we wanted to get the numbers and not arrays. We need to flatten the array. We can do that with flatMap:
Now we get flattened result:
That is one use case where
flatMap can be useful.
Lets see another example that can be quite common when we are dealing with RxJS. That is the case when the function inside of
map() returns an Observable and not a number.
We will get the following output:
Now comes the
flatMap, it can help us transform this ScalarObservable into a regular value. We have to change only one line of the code:
And we get the following output:
Reduce and scan
So lets sum numbers from 0 to 3. We can simply use interval and reduce to achieve that.
We get the following output:
Notice that timer ends after 400 ms. That implies that
reduce waited for completion of observable. It waited 4 x 100 ms and after that we got the sum as the result.
Scan can do the same thing as
reduce – go through every element and give us access to accumulated value and current item of collection.
scan and subscribing to it we can see it emits a value for us for every item. Meaning that subscriber will see results for every iteration.
And the output is now:
Clicks and buffers
Lets see how we can interact with a button click and add values to HTML div.
You can try out the example here – JS Bin.
Buffer takes an Observable as the parameter – opening. Every time Observable that was passed in buffer emits a value
buffer will redirect those values to forEach. In other words it will flush the buffer into forEach. What we want is to add values to HTML only after 1 second has passed since last click. We can achieve that by using throttleTime operator:
We are buffering all the clicks but when there is a delay of more than 1 second between clicks we flush the buffer and add the buffer result to HTML. After first click there will not be any values added to the HTML but after one second passes and we click second time we should see some values.
Let it flow!
Lets see how we can use one stream to toggle ( on/off ) another stream.
We have a source that keeps emitting values with help interval and scan operator. We make an observable from checkbox and its ‘change’ event. Whenever checkbox is checked or unchecked we
checked Observable will map that to the boolean value. Then we filter our
isChecked stream to filter only when a checkbox is checked ( when value from checked is
true ). Then we use the
flatMap operator to flatten the Observable of Observables as we saw in previous example. Now we have the problem that once we check the checkbox we can not stop the stream from running.
We can solve our problem by using another handy operator –
This operator takes values from a source Observable until some other Observable sequence or Promise produces a value. Once a single value has been produced from another source it will stop taking values from its source.
We will simply call
takeUntil on our source inside of flatMap and we will pass it
checked source of stream. That means that once
checked is changed it will stop streaming our source.
Live sample can be found at JSBin.
In RxJS with help of operators we are doing things declaratively and code is much more self descriptive.
We saw how
flatMap can help us get the clean output in two different scenarios.
takeUntil() function is a convenient way of completing a sequence when another Event occurs. We’ve learned that Observable sequences are much more powerful than raw events, because they can complete. The
takeUntil()functions are powerful enough to ensure that we never have to unsubscribe from another event again! This reduces the risk of memory leaks and makes our code more readable.
Also published on Medium.