A Pattern for RxJS and Meteor

A key innovation in the JavaScript world is reactive programming or “Observables”. When I searched for some examples, on how to use RxJS with Meteor, I didn’t find anything, so in this post, I will discuss a pattern that I discovered for using RxJS with Meteor.

What’s Meteor (www.meteor.com)

If you’re new to Meteor, it is full stack JavaScript environment. Key features are:

  • Uses Mongo DB on the server and implements small version of Mongo that runs in your browser called Mini Mongo
  • DDP (Distributed Data Protocol) that can automatically syncs a subset of your Mongo data base to your browser to achieve real-time features. When a change happens to a sub set the client has expressed interest in (subscribed to), it just shows up in Mini Mongo.  Importantly, you can also manually trigger the sync, to avoid the performance penalties of the real-time sync, making it more like a REST application without the work of defining the services.
  • RPC (Remote Procedure Calls implemented as Meteor.Methods) and full Node.js Web Server, so you can always build apps the old fashioned way, if need be.
  • Lots of officially-supported options for the presentation tier (its own Blaze framework, React and Angular 1 and Angular 2)
  • An extensive set of default packages to handle data dependency reactivity (Tracker), account management, ECMA scripting, and much more
  • A huge set of third party libraries (Atmosphere)
  • The latest version (1.3) includes
    • Support for ECMAScript 2015 style modules making it a snap to access the huge set of libraries via NPM (Node Package Manager)
    • Improved mobile app development capabilities
    • Much needed upgrades to the automated testing strategy
    • Improved documentation with Meteor Guide

Further Reading on Meteor:

RxJS

RxJS are reactive extensions for JavaScript and is a set of libraries to compose asynchronous and event-based programs using observable collections and Array Extras composition in JavaScript.

I am quite new to RxJS and the style of programming that it implements, so the descriptions that follow are based on my initial findings.  I’d love to get feedback in the comments section.

The problem that RxJS is addressing is how to manage asynchronous events.  For example, if you call a server to fetch some data, the call is sent, your code continues to run, and then takes some action when the results come back.  Similarly, mouse movements, gestures and any user input follows an asynchronous paradigm.  Increasingly, events can come in from IoT (Internet of Things) devices.   And, in the Meteor world, new data from changes to your database are delivered to your app.

Why is this a problem? Standard program paradigms just don’t handle this very well. Commonly, callbacks are used and any JavaScript developer finds him/herself knee deep in them.  As more and more asynchronous API’s are incorporated into an app, the complexity increases, standard Try/Catch error handling doesn’t work, you spend more and more time fussing with the internal app state, and before long, an app enters into callback hell and your spending precious time debugging race conditions.

It is also important to understand that there are many parallel efforts to address the problem.  Most notably, future versions of the JavaScript language will include observables directly in the language.  The laudable goal of all these initiatives are to make programming asynchronously as simple as normal synchronous programing.

It is also notable that leading frameworks are heavily embracing RxJS.  For example, it is a key component of Angular 2 and it is commonly used in React environments.

So, how does RxJS address the problem? RxJS includes the basic building blocks for watching, manipulating and responding to asynchronous events. An observable (which is like a stream) can be created and subscribed to. An observable is a lot like an array, but the contents occur over time instead of being in memory all at once.  Accordingly, RxJS implements a number of standard “array” manipulation capabilities, making it much like the JavaScript underscore or lodash libraries, but for streams.

Be prepared to do some unlearning.  It took me considerable time and use to get comfy with JavaScript promises (which are a bit like RxJS’s, little brother). Working with RxJS requires putting aside many approaches that are second nature to many developers. It’s a little like when object-oriented programming arrived on the scene. (Oops – I just dated myself.)  The concepts are foreign and seem like a more complicated version of something that everyone knows.  But, after you’re used to the concepts and you don’t have to waste energy thinking through the basics, the value shines through.  So be patient. A good starting point is to simply replace your use of promises with the use of RxJS.

Further Reading on RxJS

RxJS and Meteor

OK, with that background out of the way, we’re finally ready to get into the meat of the article.

With Meteor, whenever you create a reactive data source (not to be confused with reactive RxJS), DDP ensures that your client copy of the data is automatically updated.  The Blaze UI framework handles this automatically and causes your UI to be automatically updated when the data changes.  However, I use Angular in the apps that I’ve been creating and I wanted to explore using RxJS to update the user interface components when data changes.

The Cursor.ObserveChanges Pattern

In this scenario, I had various places in the app where I wanted to display users’ names and avatars.  Users might change their user name or avatar in the middle of a game and all player’s displays needed to be updated.

Here are some code fragments and discussion.

 export class AccountTools {  
  private static loginStatusSubject:Subject = new Rx.Subject();  
  private static userCursor:Cursor;  

In this typescript class, we declare an RxJS Subject and a Meteor Cursor as members.

 static pushEvent(userEvent:UserEvent):void {  
  return AccountTools.loginStatusSubject.onNext(userEvent);  
 }  

The class implements a method that sends an event (a UserEvent declared elsewhere) to any interested observer.

  static pushAvatarValue(user:User) {  
   AccountTools.pushEvent(  
    new UserEvent(UserEventType.AVATAR_UPDATE, {  
     userId: user._id,  
     imageURL: AccountTools.getAvatarURL(user)  
    })  
   );  
  }  

The pushAvatarValue method is a more specialized version of pushEvent the pushes a new avatar URL to the observers.

  static startObserving(  
       onNext:(event:UserEvent)=>void,   
      onError:(error:any)=>void=null,   
      onComplete:()=>void=null):Disposable {  
   let returnValue:Disposable =   
      AccountTools.loginStatusSubject.subscribe(onNext, onError, onComplete);   
   Tracker.autorun(  
    ()=>{  
     if (!AccountTools.userCursor) {  
      AccountTools.userCursor = Meteor.users.find();  
      AccountTools.userCursor.observeChanges({  
       added: (_id, doc:User)=>{  
        AccountTools.pushAvatarValue(doc);  
       },  
       changed:(_id,doc)=>{   
        console.log('CHANGED'); // TO DO?  
        console.log(doc);  
       }  
      });  
     } else {  
      AccountTools.userCursor.forEach((user:User)=>{  
       AccountTools.pushAvatarValue(user);  
      })  
     }  
    }  
   );  
   return returnValue;  
  }  

The startObserving function is a wrapper to the RxJS subscribe method and has the same signature.  After the subscription is created, the Meteor Tracker.AutoRun is called creating a reactive data source (Meteor’s meaning of reactive, not to be confused with RxJS reactive programming) for all the users the current user can “see”.  (A Meteor subscription was called elsewhere. Don’t get the Meteor.subscribe terminology confused with the RxJS subscribe.  While they are similar concepts, they are totally different things.)

Meteor runs the function immediately and reruns it each time the data changes.  So, if a friend user changes his or her avatar, the function provided to AutoRun will execute again.

The Cursor, being static, will only be instantiated once for the entire app allowing all the components to use the same resource.

During the first call, the cursor is populated and each document in the collection sends messages to any observers.

If the cursor has already been populated by a previous call, the previous results are iterated over, generating events for the existing records so a component that displays an avatar will can get access to the current URL.  This behavior, makes the subscription “cold” or like a movie in that it replays from the start to all observers. (Almost, a late comer would not get, nor want, old values that have changed.)

The following is an example of a consumer that uses the service (an Angular 1 app, written in Angular 2 style).

 @Component({  
  module: 'common',  
  selector: 'avatar',  
  controllerAs: 'vm',  
  controller: Avatar,  
  bindings: {  
   userId: '@',  
   size: '@',  
   shape: '@'  
  },  
  template: `  
   <img src="{{vm.getImageUrl()}}"/>  
 `,  
 })  
 export class Avatar {  
  private userId:string;  
  private imageURL:string;  
  disposable:Disposable;  
  private $scope:any;  
  ...  
  $onInit() {  
   this.disposable = AccountTools.startObserving((event:UserEvent)=>{  
    if (  
      event.eventType===UserEventType.AVATAR_UPDATE &&   
      event.userId===this.userId)   
    {  
     this.imageURL = event.imageURL;  
     timeoutApply(this.$scope) // Angular 1 nonsense  
    }  
   });  
  }  
  $onDestroy() {  
   if (this.disposable) {  
    this.disposable.dispose();  
   }  
  }  
  getImageUrl():string {  
   return this.imageURL;  
  }  
 }  

Here, when the (Angular 1.5) component is initialized, the startObserving function is called. The given callback is called with the initial values (because of the “cold” observable behaviour) and called again with any subsequent changes.

So, there you have it.  An Angular / Meteor app using RxJS observables.  This did solve real problems.  Previous versions had intermittent issues, especially during start up, where avatars weren’t displayed correctly.  There was an on-going, inconclusive wrestling match as I tried to get things happening at the right time.  This approach is a welcome relief.

This pattern is only effective when the order of events is not important. (E.g. if user 1 updates his avatar, then user b updates his avatar, there is no harm done if the app first updates user 2 and then updates user 1.)  This is a real possibility because, there is no guarantee that inserts to the database arrive in the same order on the client.

As I mentioned, I’m quite new to RxJS, so any comments are much appreciated.

In a future blog post, I’ll describe another pattern that attempts to guarantee the order.

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s