RxPY Tutorial on RxPY Working With Observables

an observable, is a function that creates an observer and attaches it to the source where values are expected, for example, clicks, mouse events from a dom element, etc.

the topics mentioned below will be studied in detail in this chapter.

  • create observables

  • subscribe and execute an observable

create observables

to create an observable we will use create() method and pass the function to it that has the following items.

  • on_next() − this function gets called when the observable emits an item.

  • on_completed() − this function gets called when the observable is complete.

  • on_error() − this function gets called when an error occurs on the observable.

to work with create() method first import the method as shown below −

from rx import create

here is a working example, to create an observable −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("hello")
   observer.on_error("error")
   observer.on_completed()
source = create(test_observable).

subscribe and execute an observable

to subscribe to an observable, we need to use subscribe() function and pass the callback function on_next, on_error and on_completed.

here is a working example −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("got - {0}".format(i)),
   on_error = lambda e: print("error : {0}".format(e)),
   on_completed = lambda: print("job done!"),
)

the subscribe() method takes care of executing the observable. the callback function on_next, on_error and on_completed has to be passed to the subscribe method. call to subscribe method, in turn, executes the test_observable() function.

it is not mandatory to pass all three callback functions to the subscribe() method. you can pass as per your requirements the on_next(), on_error() and on_completed().

the lambda function is used for on_next, on_error and on_completed. it will take in the arguments and execute the expression given.

here is the output, of the observable created −

e:\pyrx>python testrx.py
got - hello
job done!