RxJava Tutorial on RxJava Computation Scheduler

schedulers.computation() method creates and returns a scheduler intended for computational work. count of threads to be scheduled depends upon the cpus present in the system. one thread is allowed per cpu. best for event-loops or callback operations.

schedulers.computation() example

create the following java program using any editor of your choice in, say, c:\> rxjava.

observabletester.java

import java.util.random;

import io.reactivex.observable;
import io.reactivex.schedulers.schedulers;

public class observabletester  {
   public static void main(string[] args) throws interruptedexception {
      observable.just("a", "ab", "abc")
         .flatmap(v -> getlengthwithdelay(v)
         .doonnext(s -> system.out.println("processing thread " 
            + thread.currentthread().getname()))
         .subscribeon(schedulers.computation()))
         .subscribe(length -> system.out.println("receiver thread " 
            + thread.currentthread().getname() 
            + ", item length " + length));

         thread.sleep(10000);
   }
   protected static observable<integer> getlengthwithdelay(string v) {
      random random = new random();
      try {
         thread.sleep(random.nextint(3) * 1000);
         return observable.just(v.length());
      } catch (interruptedexception e) {
         e.printstacktrace();
      }
      return null;
   }
}

verify the result

compile the class using javac compiler as follows −

c:\rxjava>javac observabletester.java

now run the observabletester as follows −

c:\rxjava>java observabletester

it should produce the following output −

processing thread rxcomputationthreadpool-1
receiver thread rxcomputationthreadpool-1, item length 1
processing thread rxcomputationthreadpool-2
receiver thread rxcomputationthreadpool-2, item length 2
processing thread rxcomputationthreadpool-3
receiver thread rxcomputationthreadpool-3, item length 3