thread troubles (v1 to v2)

Apr 11, 2015 at 3:36 PM
I am having trouble moving from rx.cpp v1 to rx.cpp v2. I am trying to create multiple threads on which to observe/subscribe. I tried using:
rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler())
but looking at the code this seems to reuse one thread (so in my example below, in the DOES_WORK block, only the second [thread2] prints out values). I tried to just incorporate what rxcpp::observe_on_new_thread does into my main routine (DOES_NOT_WORK_1), but it fails to compile with the error:
In file included from range.cc:3:
In file included from ../distro/rxcpp/Rx/v2/src/rxcpp/rx.hpp:8:
In file included from ../distro/rxcpp/Rx/v2/src/rxcpp/rx-includes.hpp:133:
In file included from ../distro/rxcpp/Rx/v2/src/rxcpp/rx-operators.hpp:65:
../distro/rxcpp/Rx/v2/src/rxcpp/operators/rx-subscribe_on.hpp:21:41: error: no type named 'coordinator_type' in
      'rxcpp::schedulers::scheduler'
    typedef typename coordination_type::coordinator_type coordinator_type;
            ~~~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~
I also tried creating the threads in a way that seems consistent with other Rx implementations (DOES_NOT_WORK_2), but this also fails to compile with the error above and also with:
range.cc:33:10: error: no matching member function for call to 'subscribe_on'
    vals.subscribe_on(thread1).subscribe( cs, [&](int i) { if ( i < 50 ) {
    ~~~~~^~~~~~~~~~~~
All of this leads me to think that I am probably approaching this the wrong way. I would appreciate any advice on the right way to create subscribe and observe threads in rx.cpp v2. Below is my test case, and only the DOES_WORK #ifdef block compiles and runs.

Thanks very much for any help or advice!
#include <unistd.h>         // usleep(...)
#include <iostream>
#include "rxcpp/rx.hpp"

using std::cout;
using std::endl;

int main( int argc, char *argv[] ) {
    auto vals  = rxcpp::observable<>::range(1,100);
    cout << "------ serial --------" << endl;
    rxcpp::composite_subscription cs;
    vals.subscribe( cs, [&](int i) { if ( i < 50 )
                                         cout << "A" << i << endl;
                                     else
                                         cs.unsubscribe( ); } );
    vals.subscribe( [&](int i) { cout << "B" << i << endl; } );

    cout << "------ parallel ------" << endl;
#if DOES_NOT_WORK_1
    static rxcpp::observe_on_one_worker thread1(rxcpp::rxsc::make_new_thread());
    static rxcpp::observe_on_one_worker thread2(rxcpp::rxsc::make_new_thread());
    vals.subscribe_on(thread1.create_coordinator().get_scheduler()).subscribe( cs,
                [&](int i) { if ( i < 50 ) {
                                    cout << "A" << i << endl;
                                    std::this_thread::yield( );
                                } else cs.unsubscribe( ); } );
    vals.subscribe_on(thread2).subscribe(
                [&](int i) { cout << "B" << i << endl; std::this_thread::yield(); } );
#endif
#if DOES_NOT_WORK_2
    auto thread1 = std::make_shared<rxcpp::schedulers::event_loop>( );
    auto thread2 = std::make_shared<rxcpp::schedulers::event_loop>( );
    thread1->create_coordinator( );
    vals.subscribe_on(thread1).subscribe(cs,
               [&](int i) { if ( i < 50 ) {
                                   cout << "A" << i << endl;
                                         std::this_thread::yield( );
                               } else cs.unsubscribe( ); } );
    vals.subscribe_on(thread2).subscribe( [&](int i) { cout << "B" << i << endl; std::this_thread::yield(); } );
#endif
#if DOES_WORK
    auto thread1 = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
    auto thread2 = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
    vals.subscribe_on(thread1).subscribe( cs,
                [&](int i) { if ( i < 50 ) {
                                    cout << "A" << i << endl;
                                     std::this_thread::yield( );
                                } else cs.unsubscribe( ); } );
    vals.subscribe_on(thread2).subscribe( [&](int i) { cout << "B" << i << endl; std::this_thread::yield(); } );
#endif
    sleep(60);
}
Coordinator
Apr 12, 2015 at 3:54 AM
Yes, coordination is a new concept in v2. The coordination combines a scheduler thread scheduling strategy (pool, new, trampoline, none) with a cross-thread coordination strategy (none, mutex, queue). I have described this here as well (github, github)

I think that I recently found a way to remove the coordination concept and put the coordination strategy into the scheduler.

Until then here are some ways to use coordinations in your example.

This will do a new thread for every subscription, but does not serialize the access to cout so the output will be corrupted and does not wait for the streams to end so the sleep is required.
#if DOES_WORK_2
    auto threadpersubscription = rxcpp::observe_on_new_thread();
    vals.subscribe_on(threadpersubscription).subscribe( cs,
                [&](int i) { if ( i < 50 ) {
                                    cout << "A" << i << endl;
                                     std::this_thread::yield( );
                                } else cs.unsubscribe( ); } );
    vals.subscribe_on(threadpersubscription).subscribe( [&](int i) { cout << "B" << i << endl; std::this_thread::yield(); } );

    this_thread::sleep_for(60s);
#endif
But I would recommend this approach. This will do a thread per source, and a thread for the merged output and display the thread id for the source and output threads used for each item and take() will stop 'A' early without needing to have access to the subscription and the subscribe will block until all the items are printed.
    {
        auto threadpersubscription = rxcpp::observe_on_new_thread();
        vals.
            subscribe_on(threadpersubscription).
            map([](int i){std::this_thread::yield(); std::stringstream out; out << std::this_thread::get_id() << " A" << i; return out.str();}).
            take(50).
            merge(threadpersubscription, vals.
                subscribe_on(threadpersubscription).
                map([](int i){std::this_thread::yield(); std::stringstream out; out << std::this_thread::get_id() << " B" << i; return out.str();})).
            as_blocking().
            subscribe( [&](std::string s) { cout << std::this_thread::get_id() << " - " << s << endl;  } );
    }
The output would look like
. . .
------ parallel ------
8944 - 10152 A1
8944 - 304 B1
8944 - 10152 A2
8944 - 304 B2
. . . 
8944 - 10152 A49
8944 - 304 B49
8944 - 10152 A50
8944 - 304 B50
8944 - 304 B51
. . .
8944 - 304 B100