RxJava Tutorial on RxJava Buffering

buffering operator allows to gather items emitted by an observable into a list or bundles and emit those bundles instead of items. in the example below, we've created an observable to emit 9 items and using buffering, 3 items will be emitted together.

buffering example

create the following java program using any editor of your choice in, say, c:\> rxjava.

observabletester.java

import io.reactivex.observable;
import io.reactivex.observer;
import io.reactivex.disposables.disposable;
import io.reactivex.schedulers.schedulers;

import java.util.list;
import java.util.concurrent.timeunit;

public class observabletester {
   public static void main(string[] args) throws interruptedexception {
      observable<integer> observable = observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeon(schedulers.io())
         .delay(2, timeunit.seconds, schedulers.io())
         .buffer(3)
         .subscribe(new observer<list<integer>>() {
            @override
            public void onsubscribe(disposable d) {
               system.out.println("subscribed");
            }
            @override
            public void onnext(list<integer> integers) {
               system.out.println("onnext: ");
               for (integer value : integers) {
                  system.out.println(value);
               }
            }
            @override
            public void onerror(throwable e) {
               system.out.println("error");
            }

            @override
            public void oncomplete() {
               system.out.println("done! ");
            }
         });
      thread.sleep(3000);
   }
}

verify the result

compile the class using javac compiler as follows −

c:\rxjava>javac observabletester.java

now run the observabletester as follows −

c:\rxjava>java observabletester

it should produce the following output −

subscribed
onnext: 
1
2
3
onnext: 
4
5
6
onnext: 
7
8
9
done!