This project is read-only.

Error with observable::create

Jul 11, 2016 at 12:00 AM
I'm learning reactive for c++ and I'm looking for some guidance. I've created a function to wait for an event then return it. I want to catch all the events that occur with reactive async and handle them as they come. Here's what I have so far:
int Engine::Initialize()
{
al_init();

display = al_create_display(1200, 800);

eventQueue = al_create_event_queue();

al_install_mouse();
al_install_keyboard();

al_register_event_source(eventQueue, al_get_keyboard_event_source());
al_register_event_source(eventQueue, al_get_mouse_event_source());
//test: wait for 2 events to happen error here
auto events = rxcpp::observable<>::create([](rxcpp::subscriber<ALLEGRO_EVENT> e) 
{
    e.on_next(WaitForEvent);
    e.on_next(WaitForEvent);
    e.on_completed();
});

events.subscribe([](ALLEGRO_EVENT e)
{
    printf("We have an Event: %d \n", e.type);
},

[]()
{
    printf("Done\n");
});

return 0;
}

ALLEGRO_EVENT Engine::WaitForEvent()
{
ALLEGRO_EVENT event;

al_wait_for_event(eventQueue, &event);

return event;
}
I seem to get the error: no instance of function template "rxcpp::observable::create" matches the argument list. I tried to compile a similar sample from the docs where allegro_event was an int. same error. I installed the nuget package that uses v2.3.
Coordinator
Jul 11, 2016 at 3:39 PM
Hi!

it appears that the subject takes an ALLEGRO_EVENT but that on_next is passed a function that returns ALLEGRO_EVENT.
Jul 11, 2016 at 5:13 PM
Hi,
thanks for the help. Which part is the subject? I'm trying to continuously collect ALLEGRO_EVENTs from the WaitForMethod function and process them.
Coordinator
Jul 11, 2016 at 9:39 PM
Sorry, I meant that the subscriber<ALLEGRO_EVENT> expects e.on_next(WaitForEvent()) not e.on_next(WaitForEvent)
Jul 11, 2016 at 9:50 PM
Edited Jul 11, 2016 at 9:51 PM
hmm, now I get a new error(see screenshot)

https://1drv.ms/i/s!ApaHlQI97NYVr6MkW7rb011cf59nKg

Thanks for your help.
Coordinator
Jul 11, 2016 at 10:00 PM
based on the errors, try this:
auto events = rxcpp::observable<>::create<ALLEGRO_EVENT>([this](rxcpp::subscriber<ALLEGRO_EVENT> e) 
{
    e.on_next(this->WaitForEvent());
    e.on_next(this->WaitForEvent());
    e.on_completed();
});
Jul 12, 2016 at 2:17 AM
It seems to work now! One last question... right now it only calls WaitForEventTwice. If I wanted to keep calling it indefinitely until I call unsubscribe, how would I do so?
Coordinator
Jul 12, 2016 at 2:42 AM
storing a subcription in Engine and passing it to subscribe would allow those with access to Engine to unsubscribe.
auto events = rxcpp::observable<>::create<ALLEGRO_EVENT>([this](rxcpp::subscriber<ALLEGRO_EVENT> e) 
{
    while(e.is_subscribed()) {
        e.on_next(this->WaitForEvent());
    }
    e.on_completed();
});

events.subscribe(this->eventsLifetime, 

[](ALLEGRO_EVENT e)
{
    printf("We have an Event: %d \n", e.type);
},

[]()
{
    printf("Done\n");
});
Also take a look at the run_loop scheduler.
You can modify the existing message loop in an app to
  • process the run_loop queue and
  • call on_next on a message subject<ALLEGRO_EVENT> for each message.
The subject (stored in the Engine) would replace the create and allow multiple subscriptions to the same events.