2017-08-13 1 views
1

J'utilise RxCPP et j'ai des difficultés à comprendre son comportement.RxCPP se comporte différemment de Rx.Net

Voici deux programmes, l'un en Rx.Net et l'autre en RxCPP. Ils supposent produire les mêmes impressions, mais ils ne le font pas.
le programme prend des points à partir d'un flux de souris et calcule un flux de deltas entre les points.
la souris est un flux de flux de points, chaque coup - de bas en haut en appuyant sur un flux. la souris donne de tels flux les uns après les autres.

Dans ces essais la sortie attendue est la suivante:
Delta no 0 est: 0,0
Delta n ° 1 est la suivante: 5,0
Delta n ° 2 est la suivante: 0,5
Delta n ° 3 est la suivante: 2,3
Ce qui est ce que les sorties Rx.Net.
La sortie Rx.Cpp seulement la première ligne: Delta no 0 est: 0,0

Une idée?

exemple Rx.Cpp:

#include <rx.hpp> 
    namespace rx = rxcpp; 
    namespace rxsub = rxcpp::subjects; 
    using rxob = rx::observable<>; 

    struct Point 
    { 
     Point(int x, int y) : x(x), y(y) {} 

     int x = 0, y = 0; 
     Point operator-() const { return {-x, -y}; } 
     Point operator+(const Point& other) const { return Point{x + other.x, y + other.y}; } 
     Point operator-(const Point& other) const { return operator+(-other); } 
    }; 

    std::ostream& operator<<(std::ostream& o, const Point& p) 
    { 
     return o << "(" << p.x << ", " << p.y << ")"; 
    } 

    void TestRxCPP() 
    { 
     using RxPoint = rx::observable<Point>; 
     using Strokes = rx::observable<RxPoint>; 
     using StrokesSubject = rxsub::subject<RxPoint>; 

     StrokesSubject mouseSource; 
     auto strokes = mouseSource.get_observable(); 

     auto deltaVectors = [](Strokes strokes) { 
     auto deltas = strokes.flat_map([=](RxPoint stroke) { 
      auto points = stroke; 
      // create stream of delta vectors from start point 
      auto firstPoint = points.take(1); 
      auto delta = 
       points.combine_latest([](Point v0, Point v1) { return v0 - v1; }, firstPoint); 
      return delta; 
     }); 

     return deltas; 
     }; 

     auto delta = deltaVectors(strokes); 
     int n = 0; 
     delta.subscribe(
     [&](const Point& d) { std::cout << "Delta no. " << n++ << " is: " << d << std::endl; }); 

     auto testMouse = rxob::from(Point{3 + 0, 4 + 0}, Point{3 + 5, 4 + 0}, Point{3 + 0, 4 + 5}, Point{3 + 2, 4 + 3}); 
     mouseSource.get_subscriber().on_next(testMouse); 
    } 

Rx.Net exemple:

void RxNET() 
    { 
     var strokesS = new Subject<IObservable<Point>>(); 

     Func<IObservable<IObservable<Point>>, IObservable<Point>> 
     deltaVectors = strokes => 
     { 
      var deltas = strokes.SelectMany(stroke => 
      { 
       var points = stroke; 
       // create stream of delta vectors from start point 
       var firstPoint = points.Take(1); 
       var deltaP = 
        points.CombineLatest(firstPoint, (v0, v1) => new Point(v0.X - v1.X, v0.Y - v1.Y)); 
       return deltaP; 
      }); 

      return deltas; 
     }; 

     var delta = deltaVectors(strokesS); 
     var n = 0; 
     delta.Subscribe(d => { Console.WriteLine($"Delta no {n++} is: {d}\n"); }); 

     var testMouse = new List<Point> 
     { 
      new Point(3 + 0, 4 + 0), 
      new Point(3 + 5, 4 + 0), 
      new Point(3 + 0, 4 + 5), 
      new Point(3 + 2, 4 + 3) 
     }.ToObservable(); 
     strokesS.OnNext(testMouse); 
    } 
+0

Avez-vous essayé d'utiliser un débogueur ou incluant des instructions trace pour savoir où diverger les deux morceaux de fonctionnalité? –

+0

Il est très difficile (pour moi) de déboguer la bibliothèque rxcpp, son C++ très haut de gamme :-( – ShaulF

Répondre

0

Thanks to @Kirk Shoop at the rxcpp github :-)
c'est un comportement HOTvCOLD.

les traits sont COLD et sont en cours de partage et un seul thread est utilisé. le points.combine_latest(..., firstPoint) signifie que tous les points sont envoyés avant que firstPoint soit souscrit. ainsi seul le dernier delta est émis.

sources froide et chaude fonctionneront si vous inversez la combine_latest

auto delta = 
    firstPoint.combine_latest([](Point v0, Point v1) { return v1 - v0; }, points);