/ RUBY

Asynchronous Programming in Ruby

In traditional programming practice, I/O operations happen synchronously. The main thread will be blocked until I/O operation gets completed and CPU remains idle for the time I/O operation is getting completed. Consider the following example where CPU is blocked until query execution completed.

Asynchronous Programming in Ruby

  # query mongo db collection for all documents
  data = collection.find({})
  puts "number of documents in collection  - #{data.count}"
  puts "hello"

  ----- output------
   number of documents in collection  - 8
   hello

In the above code, hello will be printed when query execution will be completed and CPU will remain idle for that time.

To overcome this problem Node.js introduced asynchronous programming model where the code will not wait for I/O operations until the operation is completed. Node .js uses event loops to support asynchronous programming model. Now consider the following code in Node.js for the same functionality.

  data = collection.find({}, function(err, docs) {
    console.log("number of documents in collection  -",docs.length);
  });

  console.log("hello");

  ----- Output -------
  hello
  number of documents in collection  - 8

It is evident from the above example that query is running into a non-blocking thread and when execution of query completed event loop notifies the main thread that output of the query is available.

One question which comes to our mind about above paradigm is how it is different from pub/sub pattern? Pub-sub relates to messaging which occurs while two different application(modules) communicates notify each other while asynchronous programming is about the state change of the object in the same application.

The Same functionality can be achieved in Ruby using RxRuby. RxRuby supports event-based programming in Ruby. RxRuby implements Observer pattern. In Observer pattern “Subject” which holds the data notifies “Observers” on its state change.

RxRuby is implemented on principles of ReactiveX. In ReactiveX, an observer is an object which subscribes to the state of the subject, whenever subject emits an item or sequence of items observer reacts to it.

In the following section we will execute two mongo queries asynchronously and will process the result of the query when its response will be emitted.

In above code Rx::Observable.start function converts a function into the observable subject who will notify its state change to its subscribers. It takes 3 arguments

  • observable function
  • context on which observable function will execute
  • Thread on which Observable will execute

I have created two observable of mongo queries. First is aggregate query and second one is find query. Execution time for find query is lesser than aggregate query.

In above code subscribe function acts as the subscriber to the state change of the Observable object. Its takes 3 arguments as the subscriber.

  • onNext — acts when observable emits an object
  • onError — acts when observable emits an error
  • onCompleted — acts when observable notifies that stream has completed.

I have created two observers which is observing on state of Mongo query observables. As soon as an Observable emits a value subscriber process that value.

You can check out the full source code of above example below.

Note: At present documentation for RxRuby is insufficient for production level usage.

Resources

Buy me a coffee