diff options
Diffstat (limited to 'Rx/v2/test/operators/lift.cpp')
-rw-r--r-- | Rx/v2/test/operators/lift.cpp | 287 |
1 files changed, 0 insertions, 287 deletions
diff --git a/Rx/v2/test/operators/lift.cpp b/Rx/v2/test/operators/lift.cpp deleted file mode 100644 index 1f58c1d..0000000 --- a/Rx/v2/test/operators/lift.cpp +++ /dev/null @@ -1,287 +0,0 @@ -#include "../test.h" - -namespace detail { - -template<class Predicate> -struct liftfilter -{ - typedef typename std::decay<Predicate>::type test_type; - test_type test; - - liftfilter(test_type t) - : test(t) - { - } - - template<class Subscriber> - struct filter_observer : public rx::observer_base<typename std::decay<Subscriber>::type::value_type> - { - typedef filter_observer<Subscriber> this_type; - typedef rx::observer_base<typename std::decay<Subscriber>::type::value_type> base_type; - typedef typename base_type::value_type value_type; - typedef typename std::decay<Subscriber>::type dest_type; - typedef rx::observer<value_type, this_type> observer_type; - dest_type dest; - test_type test; - - filter_observer(dest_type d, test_type t) - : dest(d) - , test(t) - { - } - void on_next(typename dest_type::value_type v) const { - bool filtered = false; - RXCPP_TRY { - filtered = !test(v); - } RXCPP_CATCH(...) { - dest.on_error(rxu::current_exception()); - return; - } - if (!filtered) { - dest.on_next(v); - } - } - void on_error(rxu::error_ptr e) const { - dest.on_error(e); - } - void on_completed() const { - dest.on_completed(); - } - - static rx::subscriber<value_type, observer_type> make(const dest_type& d, const test_type& t) { - return rx::make_subscriber<value_type>(d, observer_type(this_type(d, t))); - } - }; - - template<class Subscriber> - auto operator()(const Subscriber& dest) const - -> decltype(filter_observer<Subscriber>::make(dest, test)) { - return filter_observer<Subscriber>::make(dest, test); - } -}; - -} - -namespace { - -template<class Predicate> -auto liftfilter(Predicate&& p) - -> detail::liftfilter<typename std::decay<Predicate>::type> { - return detail::liftfilter<typename std::decay<Predicate>::type>(std::forward<Predicate>(p)); -} - -bool IsPrime(int x) -{ - if (x < 2) return false; - for (int i = 2; i <= x/2; ++i) - { - if (x % i == 0) - return false; - } - return true; -} - -} - -SCENARIO("lift liftfilter stops on disposal", "[where][filter][lift][operators]"){ - GIVEN("a test hot observable of ints"){ - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - long invoked = 0; - - auto xs = sc.make_hot_observable({ - on.next(110, 1), - on.next(180, 2), - on.next(230, 3), - on.next(270, 4), - on.next(340, 5), - on.next(380, 6), - on.next(390, 7), - on.next(450, 8), - on.next(470, 9), - on.next(560, 10), - on.next(580, 11), - on.completed(600) - }); - - WHEN("filtered to ints that are primes"){ - - auto res = w.start( - [&xs, &invoked]() { - return xs - .lift<int>(liftfilter([&invoked](int x) { - invoked++; - return IsPrime(x); - })) - // forget type to workaround lambda deduction bug on msvc 2013 - .as_dynamic(); - }, - 400 - ); - - THEN("the output only contains primes that arrived before disposal"){ - auto required = rxu::to_vector({ - on.next(230, 3), - on.next(340, 5), - on.next(390, 7) - }); - auto actual = res.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there was one subscription and one unsubscription"){ - auto required = rxu::to_vector({ - on.subscribe(200, 400) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - THEN("where was called until disposed"){ - REQUIRE(5 == invoked); - } - } - } -} - -SCENARIO("stream lift liftfilter stops on disposal", "[where][filter][lift][stream][operators]"){ - GIVEN("a test hot observable of ints"){ - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - long invoked = 0; - - auto xs = sc.make_hot_observable({ - on.next(110, 1), - on.next(180, 2), - on.next(230, 3), - on.next(270, 4), - on.next(340, 5), - on.next(380, 6), - on.next(390, 7), - on.next(450, 8), - on.next(470, 9), - on.next(560, 10), - on.next(580, 11), - on.completed(600) - }); - - WHEN("filtered to ints that are primes"){ - - auto res = w.start( - [&xs, &invoked]() { - return xs - >> rxo::lift<int>(liftfilter([&invoked](int x) { - invoked++; - return IsPrime(x); - })) - // forget type to workaround lambda deduction bug on msvc 2013 - >> rxo::as_dynamic(); - }, - 400 - ); - - THEN("the output only contains primes that arrived before disposal"){ - auto required = rxu::to_vector({ - on.next(230, 3), - on.next(340, 5), - on.next(390, 7) - }); - auto actual = res.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there was one subscription and one unsubscription"){ - auto required = rxu::to_vector({ - on.subscribe(200, 400) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - THEN("where was called until disposed"){ - REQUIRE(5 == invoked); - } - } - } -} - -SCENARIO("lift lambda filter stops on disposal", "[where][filter][lift][lambda][operators]"){ - GIVEN("a test hot observable of ints"){ - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - long invoked = 0; - - auto xs = sc.make_hot_observable({ - on.next(110, 1), - on.next(180, 2), - on.next(230, 3), - on.next(270, 4), - on.next(340, 5), - on.next(380, 6), - on.next(390, 7), - on.next(450, 8), - on.next(470, 9), - on.next(560, 10), - on.next(580, 11), - on.completed(600) - }); - - WHEN("filtered to ints that are primes"){ - - auto res = w.start( - [&xs, &invoked]() { - auto predicate = [&](int x){ - invoked++; - return IsPrime(x); - }; - return xs - .lift<int>([=](rx::subscriber<int> dest){ - // VS2013 deduction issue requires dynamic (type-forgetting) - return rx::make_subscriber<int>( - dest, - rx::make_observer_dynamic<int>( - [=](int n){ - bool pass = false; - RXCPP_TRY {pass = predicate(n);} RXCPP_CATCH(...){dest.on_error(rxu::current_exception());}; - if (pass) {dest.on_next(n);} - }, - [=](rxu::error_ptr e){dest.on_error(e);}, - [=](){dest.on_completed();})); - }) - // forget type to workaround lambda deduction bug on msvc 2013 - .as_dynamic(); - }, - 400 - ); - - THEN("the output only contains primes that arrived before disposal"){ - auto required = rxu::to_vector({ - on.next(230, 3), - on.next(340, 5), - on.next(390, 7) - }); - auto actual = res.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there was one subscription and one unsubscription"){ - auto required = rxu::to_vector({ - on.subscribe(200, 400) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - THEN("where was called until disposed"){ - REQUIRE(5 == invoked); - } - } - } -} - |