2017-07-12 1 views
2

Considérons une fonction de blocage: this_thread :: sleep_for (millisecondes (3000));RXCPP: Timeout sur la fonction de blocage

Je suis en train d'obtenir le comportement suivant:

Trigger Blocking Function    

|---------------------------------------------X 

Je veux déclencher la fonction de blocage et si elle prend trop de temps (plus de deux secondes), il devrait délai d'attente.

Je l'ai fait ce qui suit:

my_connection = observable<>::create<int>([](subscriber<int> s) { 
    auto s2 = observable<>::just(1, observe_on_new_thread()) | 
    subscribe<int>([&](auto x) { 
     this_thread::sleep_for(milliseconds(3000)); 
     s.on_next(1); 
    }); 
}) | 
timeout(seconds(2), observe_on_new_thread()); 

Je ne peux pas obtenir ce travail. Pour commencer, je pense que s ne peut pas on_next à partir d'un fil différent. Donc, ma question est, quelle est la manière réactive correcte de le faire? Comment est-ce que je peux emballer une fonction de blocage dans rxcpp et lui ajouter un timeout?

, je veux ensuite obtenir un flux de RX qui se comporte comme ceci:

Trigger    Cleanup 

|------------------------X 
          (Delay) Trigger   Cleanup 
             |-----------------X 
+0

Une autre façon que j'ai pensé que cela peut être fait est: 'auto connection = timeout.amb (blocking_function_observable)' – jc211

Répondre

0

Grande question! Ce qui précède est assez proche.

Voici un exemple d'adaptation des opérations de blocage à rxcpp. Il fait libcurl polling pour effectuer des requêtes http.

Ce qui suit devrait faire ce que vous vouliez.

auto sharedThreads = observe_on_event_loop(); 

auto my_connection = observable<>::create<int>([](subscriber<int> s) { 
     this_thread::sleep_for(milliseconds(3000)); 
     s.on_next(1); 
     s.on_completed(); 
    }) | 
    subscribe_on(observe_on_new_thread()) | 
    //start_with(0) | // workaround bug in timeout 
    timeout(seconds(2), sharedThreads); 
    //skip(1); // workaround bug in timeout 

my_connection.as_blocking().subscribe(
    [](int){}, 
    [](exception_ptr ep){cout << "timed out" << endl;} 
); 
  • subscribe_on se déroulera le create sur un fil dédié, et donc create est autorisé à bloquer ce fil.
  • timeout exécutera le minuteur sur un thread différent, qui peut être partagé avec d'autres, et transférera tous les appels on_next/on_error/on_completed à ce même thread.
  • as_blocking s'assurera que subscribe ne retournera pas jusqu'à ce qu'il soit terminé. Ceci est seulement utilisé pour empêcher main() de quitter - le plus souvent dans les programmes de test ou d'exemple.

EDIT: solution de contournement ajoutée pour un bogue dans timeout. À l'heure actuelle, il ne planifie pas le premier délai avant l'arrivée de la première valeur.

EDIT-2: timeout bug a été corrigé, la solution de contournement n'est plus nécessaire.

+0

Salut Kirk, merci pour votre contribution à ce sujet. J'ai couru l'extrait de code que vous avez suggéré. Il semble que le 'on_next' se déclenche avant le' on_error'. Mon soupçon est le 'this_thread :: sleep_for()' bloque le délai d'attente du démarrage de la minuterie interne. Si le 's.on_completed' est enlevé, un' on_next' se déclenche suivi de 'on_error' depuis le timeout. – jc211

+0

c'est ce que je reçois pour ne pas essayer cela en premier. J'ai trouvé un bug dans l'opérateur de timeout, j'ai mis à jour la réponse avec une solution de contournement pour le bogue. –

+0

Testé la solution de contournement et son fonctionnement. Je vous remercie! – jc211