RxJS – Part 6 – Chat application with RxJS

This entry is part 6 of 6 in the RxJS series


In the first post of RxJS series we talked about reactive programming and benefits or RxJS. Second post covered Observable and how we can effectively use and manipulate Observables. After that, in third post we tried to get familiar with RxJS operators. We tried to briefly cover the error handling subject in the fourth post.

This time we will dive in deep. We will use RxJS, Angular, express (Node.js) & Socket.IO to make a chat application. We will get to see how useful RxJS can be in this scenario. For the purpose of making things smooth and easy we will be using Angular CLI to generate basic client structure and get us a boilerplate for simplest working Angular application. On the back-end we will use Node.js with express and Socket.IO. Reasoning behind this is that Socket.IO is very easy to set up and work with. Furthermore, it provides both server and client side libraries. Socket.IO primarily uses WebSocket protocol to enable real-time bidirectional communication.

Lets get started!


Using RxJS to handle possible chat abuse

If you skipped to this post to see what we can do with RxJS you can browse the current code at this commit – ‘using moment.js to output messages with timestamp’.

Lets keep our chat functionality simple for now. We could definitely extend what we have at the moment. We should definitely add usernames. Furthermore, we could add chat rooms. However, for purpose of learning RxJS we will keep it as it is at the moment. Because, that is all we need for now to play around with RxJS. This is all nice and fancy, we got something like chat working, messages are flowing through from one side to another. But what happens when an evil abuser comes to our chat and starts spamming the chat? We are doomed!

Fear not! RxJS is here!

For clarity we will add something very useful for our purpose yet very trivial to implement. Timestamp for every message. I will install very handy library for dealing with dates – Moment.js. I will import the moment directly to our component and change  the code where we push the message to messages array.

If you skipped to this chapter to see what we can do with RxJS you can browse the current code at this commit – ‘using moment.js to output messages with timestamp’.


Using operators with Observable

User connects to our sweet chat and starts spamming. She/he keeps sending messages. It feels like dozens of messages per second. Classical chat flooding from a user.

We could do quite a few things, but lets look at throttleTime() operator:

throttleTime(duration: number): Observable<T>

Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

We do not want to delay messages, we want to ignore them for a short period of time. Lets see how our code looks now:

Another problem arises when user starts spamming the same message. Over and over. However, we can simply use distinctUntilChanged() operator.

distinctUntilChanged(compare: function): Observable

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.


Prevent empty messages and add message counter

Furthermore, we can prevent user to send empty messages. Also, we could disallow messages that contain only white space. This is where we use filter() operator. Another thing we can do is to keep the count of messages that users sent. We can append that number to content of every message. Lets take a look at the code:


Secret codes to start and finish conversation

For a moment imagine we have a chat room and that no one can send a message until an admin or a user with secret code starts the conversation. And we will enable regular user to enter the secret code in form of regular message. Lets take a look at skipWhile operator:

Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds true, but emits all further source items as soon as the condition becomes false.


We will add secretCode property and give it initial value inside of AppComponent constructor.


We could also use similar technique to print messages until another secret code is entered to terminate the conversation. This is where we will use takeWhile operator:

Emits values emitted by the source Observable so long as each value satisfies the given predicate, and then completes as soon as this predicate is not satisfied.

Takes values from the source only while they pass the condition given. When the first value does not satisfy, it completes.


Final code for our AppComponent:


That is all for now! You can find complete code at my rxjs-chat repository.



RxJS Series Navigation: Previous post: << RxJS – Part 5 – RxJS error handling
Next post:

Ibrahim Šuta

Software Consultant interested and specialising in ASP.NET Core, C#, JavaScript, Angular, React.js.