summaryrefslogtreecommitdiff
path: root/Rx/v2/test/operators/lift.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/test/operators/lift.cpp')
-rw-r--r--Rx/v2/test/operators/lift.cpp287
1 files changed, 0 insertions, 287 deletions
diff --git a/Rx/v2/test/operators/lift.cpp b/Rx/v2/test/operators/lift.cpp
deleted file mode 100644
index 1f58c1d..0000000
--- a/Rx/v2/test/operators/lift.cpp
+++ /dev/null
@@ -1,287 +0,0 @@
-#include "../test.h"
-
-namespace detail {
-
-template<class Predicate>
-struct liftfilter
-{
- typedef typename std::decay<Predicate>::type test_type;
- test_type test;
-
- liftfilter(test_type t)
- : test(t)
- {
- }
-
- template<class Subscriber>
- struct filter_observer : public rx::observer_base<typename std::decay<Subscriber>::type::value_type>
- {
- typedef filter_observer<Subscriber> this_type;
- typedef rx::observer_base<typename std::decay<Subscriber>::type::value_type> base_type;
- typedef typename base_type::value_type value_type;
- typedef typename std::decay<Subscriber>::type dest_type;
- typedef rx::observer<value_type, this_type> observer_type;
- dest_type dest;
- test_type test;
-
- filter_observer(dest_type d, test_type t)
- : dest(d)
- , test(t)
- {
- }
- void on_next(typename dest_type::value_type v) const {
- bool filtered = false;
- RXCPP_TRY {
- filtered = !test(v);
- } RXCPP_CATCH(...) {
- dest.on_error(rxu::current_exception());
- return;
- }
- if (!filtered) {
- dest.on_next(v);
- }
- }
- void on_error(rxu::error_ptr e) const {
- dest.on_error(e);
- }
- void on_completed() const {
- dest.on_completed();
- }
-
- static rx::subscriber<value_type, observer_type> make(const dest_type& d, const test_type& t) {
- return rx::make_subscriber<value_type>(d, observer_type(this_type(d, t)));
- }
- };
-
- template<class Subscriber>
- auto operator()(const Subscriber& dest) const
- -> decltype(filter_observer<Subscriber>::make(dest, test)) {
- return filter_observer<Subscriber>::make(dest, test);
- }
-};
-
-}
-
-namespace {
-
-template<class Predicate>
-auto liftfilter(Predicate&& p)
- -> detail::liftfilter<typename std::decay<Predicate>::type> {
- return detail::liftfilter<typename std::decay<Predicate>::type>(std::forward<Predicate>(p));
-}
-
-bool IsPrime(int x)
-{
- if (x < 2) return false;
- for (int i = 2; i <= x/2; ++i)
- {
- if (x % i == 0)
- return false;
- }
- return true;
-}
-
-}
-
-SCENARIO("lift liftfilter stops on disposal", "[where][filter][lift][operators]"){
- GIVEN("a test hot observable of ints"){
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- long invoked = 0;
-
- auto xs = sc.make_hot_observable({
- on.next(110, 1),
- on.next(180, 2),
- on.next(230, 3),
- on.next(270, 4),
- on.next(340, 5),
- on.next(380, 6),
- on.next(390, 7),
- on.next(450, 8),
- on.next(470, 9),
- on.next(560, 10),
- on.next(580, 11),
- on.completed(600)
- });
-
- WHEN("filtered to ints that are primes"){
-
- auto res = w.start(
- [&xs, &invoked]() {
- return xs
- .lift<int>(liftfilter([&invoked](int x) {
- invoked++;
- return IsPrime(x);
- }))
- // forget type to workaround lambda deduction bug on msvc 2013
- .as_dynamic();
- },
- 400
- );
-
- THEN("the output only contains primes that arrived before disposal"){
- auto required = rxu::to_vector({
- on.next(230, 3),
- on.next(340, 5),
- on.next(390, 7)
- });
- auto actual = res.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there was one subscription and one unsubscription"){
- auto required = rxu::to_vector({
- on.subscribe(200, 400)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- THEN("where was called until disposed"){
- REQUIRE(5 == invoked);
- }
- }
- }
-}
-
-SCENARIO("stream lift liftfilter stops on disposal", "[where][filter][lift][stream][operators]"){
- GIVEN("a test hot observable of ints"){
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- long invoked = 0;
-
- auto xs = sc.make_hot_observable({
- on.next(110, 1),
- on.next(180, 2),
- on.next(230, 3),
- on.next(270, 4),
- on.next(340, 5),
- on.next(380, 6),
- on.next(390, 7),
- on.next(450, 8),
- on.next(470, 9),
- on.next(560, 10),
- on.next(580, 11),
- on.completed(600)
- });
-
- WHEN("filtered to ints that are primes"){
-
- auto res = w.start(
- [&xs, &invoked]() {
- return xs
- >> rxo::lift<int>(liftfilter([&invoked](int x) {
- invoked++;
- return IsPrime(x);
- }))
- // forget type to workaround lambda deduction bug on msvc 2013
- >> rxo::as_dynamic();
- },
- 400
- );
-
- THEN("the output only contains primes that arrived before disposal"){
- auto required = rxu::to_vector({
- on.next(230, 3),
- on.next(340, 5),
- on.next(390, 7)
- });
- auto actual = res.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there was one subscription and one unsubscription"){
- auto required = rxu::to_vector({
- on.subscribe(200, 400)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- THEN("where was called until disposed"){
- REQUIRE(5 == invoked);
- }
- }
- }
-}
-
-SCENARIO("lift lambda filter stops on disposal", "[where][filter][lift][lambda][operators]"){
- GIVEN("a test hot observable of ints"){
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- long invoked = 0;
-
- auto xs = sc.make_hot_observable({
- on.next(110, 1),
- on.next(180, 2),
- on.next(230, 3),
- on.next(270, 4),
- on.next(340, 5),
- on.next(380, 6),
- on.next(390, 7),
- on.next(450, 8),
- on.next(470, 9),
- on.next(560, 10),
- on.next(580, 11),
- on.completed(600)
- });
-
- WHEN("filtered to ints that are primes"){
-
- auto res = w.start(
- [&xs, &invoked]() {
- auto predicate = [&](int x){
- invoked++;
- return IsPrime(x);
- };
- return xs
- .lift<int>([=](rx::subscriber<int> dest){
- // VS2013 deduction issue requires dynamic (type-forgetting)
- return rx::make_subscriber<int>(
- dest,
- rx::make_observer_dynamic<int>(
- [=](int n){
- bool pass = false;
- RXCPP_TRY {pass = predicate(n);} RXCPP_CATCH(...){dest.on_error(rxu::current_exception());};
- if (pass) {dest.on_next(n);}
- },
- [=](rxu::error_ptr e){dest.on_error(e);},
- [=](){dest.on_completed();}));
- })
- // forget type to workaround lambda deduction bug on msvc 2013
- .as_dynamic();
- },
- 400
- );
-
- THEN("the output only contains primes that arrived before disposal"){
- auto required = rxu::to_vector({
- on.next(230, 3),
- on.next(340, 5),
- on.next(390, 7)
- });
- auto actual = res.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there was one subscription and one unsubscription"){
- auto required = rxu::to_vector({
- on.subscribe(200, 400)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- THEN("where was called until disposed"){
- REQUIRE(5 == invoked);
- }
- }
- }
-}
-