buffering operator allows to gather items emitted by an observable into a list or bundles and emit those bundles instead of items. in the example below, we've created an observable to emit 9 items and using buffering, 3 items will be emitted together.
buffering example
create the following java program using any editor of your choice in, say, c:\> rxjava.
observabletester.java
import io.reactivex.observable; import io.reactivex.observer; import io.reactivex.disposables.disposable; import io.reactivex.schedulers.schedulers; import java.util.list; import java.util.concurrent.timeunit; public class observabletester { public static void main(string[] args) throws interruptedexception { observable<integer> observable = observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9); observable.subscribeon(schedulers.io()) .delay(2, timeunit.seconds, schedulers.io()) .buffer(3) .subscribe(new observer<list<integer>>() { @override public void onsubscribe(disposable d) { system.out.println("subscribed"); } @override public void onnext(list<integer> integers) { system.out.println("onnext: "); for (integer value : integers) { system.out.println(value); } } @override public void onerror(throwable e) { system.out.println("error"); } @override public void oncomplete() { system.out.println("done! "); } }); thread.sleep(3000); } }
verify the result
compile the class using javac compiler as follows −
c:\rxjava>javac observabletester.java
now run the observabletester as follows −
c:\rxjava>java observabletester
it should produce the following output −
subscribed onnext: 1 2 3 onnext: 4 5 6 onnext: 7 8 9 done!