This project is read-only.

ToAsync(f) w/ v2 coordination

Jan 27, 2016 at 7:09 PM
While I think the v2 semantics are often more straightforward from a C++ point of view, I also find it useful to be able to cull ideas from Rx examples in other languages. One of the v1 operators that I used was ToAsync(f) so I thought I would see what was required to implement it with Rx.cpp v2:
template<class... A, class F>
auto ToAsync( F f )
    -> std::function <rxcpp::observable<decltype(f((*(A*)nullptr)...)) > (const A&...)>
{
    typedef decltype(f((*(A*) nullptr)...)) R;

    return [=](const A&... a) -> rxcpp::observable<R>
        {
            auto result = rxcpp::subjects::subject<R>( );
            rxcpp::observe_on_new_thread().create_coordinator().get_worker().schedule(
                    [=](const rxcpp::rxsc::schedulable&) {
                        rxcpp::util::maybe<R> value;
                        try {
                            value = f(a...);
                            result.get_subscriber( ).on_next(value.get( ));
                            result.get_subscriber( ).on_completed( );
                        } catch (...) {
                            result.get_subscriber( ).on_error(std::current_exception());
                        }
            });
            return result.get_observable( );
        };
}
My questions are how is the best way to parameterize this for different coordinators, and how is the best way to have the coordinator default to something sensible (and I guess, does this code get the most sensible default coordinator in a reasonable way)?

thanks for all the help...
Coordinator
Jan 28, 2016 at 4:08 AM
Hi!

ToAsync would be a nice addition!

This implementation is a good start, but would have to return the observable<> from an AsyncSubject<>. The AsyncSubject<> solves the same race condition that promise/future does by storing the value or the error result of f(a...) and ensuring delivery to each new observer<> that is subscribed.

It would be nice to build an explicit functor for the return value, since std::function<> may allocate memory, but this not required in this case.

The coordination is handled by having two overloads. The Coordination would be passed as a templated type with a types_checked_from<> to ensure that the method is only compiled if the type is a Coordination
    template<class... A, class F>
    auto ToAsync( F f) const . . .

    template<class... A, class F, 
             class Coordination, 
             class Requires = typename rxu::types_checked_from<typename Coordination::coordination_tag>::type>
    auto ToAsync( F f , Coordination coordination) const . . .
FYI - Active dev is in github these days.