diff options
Diffstat (limited to 'Rx/v2/test/subjects/subject.cpp')
-rw-r--r-- | Rx/v2/test/subjects/subject.cpp | 745 |
1 files changed, 0 insertions, 745 deletions
diff --git a/Rx/v2/test/subjects/subject.cpp b/Rx/v2/test/subjects/subject.cpp deleted file mode 100644 index 09318a3..0000000 --- a/Rx/v2/test/subjects/subject.cpp +++ /dev/null @@ -1,745 +0,0 @@ -#define RXCPP_SUBJECT_TEST_ASYNC 1 - -#include "../test.h" - -#include <rxcpp/operators/rx-finally.hpp> - -#include <future> - - -const int static_onnextcalls = 10000000; -static int aliased = 0; - -SCENARIO("for loop locks mutex", "[!hide][for][mutex][long][perf]"){ - const int& onnextcalls = static_onnextcalls; - GIVEN("a for loop"){ - WHEN("locking mutex 100 million times"){ - using namespace std::chrono; - typedef steady_clock clock; - - int c = 0; - int n = 1; - auto start = clock::now(); - std::mutex m; - for (int i = 0; i < onnextcalls; i++) { - std::unique_lock<std::mutex> guard(m); - ++c; - } - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - std::cout << "loop mutex : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; - - } - } -} - -namespace syncwithvoid { -template<class T, class OnNext> -class sync_subscriber -{ -public: - OnNext onnext; - bool issubscribed; - explicit sync_subscriber(OnNext on) - : onnext(on) - , issubscribed(true) - { - } - bool is_subscribed() {return issubscribed;} - void unsubscribe() {issubscribed = false;} - void on_next(T v) { - onnext(v); - } -}; -} -SCENARIO("for loop calls void on_next(int)", "[!hide][for][asyncobserver][baseline][perf]"){ - const int& onnextcalls = static_onnextcalls; - GIVEN("a for loop"){ - WHEN("calling on_next 100 million times"){ - using namespace std::chrono; - typedef steady_clock clock; - - auto c = std::addressof(aliased); - *c = 0; - int n = 1; - auto start = clock::now(); - auto onnext = [c](int){++*c;}; - syncwithvoid::sync_subscriber<int, decltype(onnext)> scbr(onnext); - for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) { - scbr.on_next(i); - } - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - std::cout << "loop void : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; - - } - } -} - -namespace asyncwithready { -// ready is an immutable class. -class ready -{ -public: - typedef std::function<void()> onthen_type; -private: - std::function<void(onthen_type)> setthen; -public: - ready() {} - ready(std::function<void(onthen_type)> st) : setthen(st) {} - bool is_ready() {return !setthen;} - void then(onthen_type ot) { - if (is_ready()) { - abort(); - } - setthen(ot); - } -}; -template<class T, class OnNext> -class async_subscriber -{ -public: - OnNext onnext; - bool issubscribed; - int count; - explicit async_subscriber(OnNext on) - : onnext(on) - , issubscribed(true) - , count(0) - { - } - bool is_subscribed() {return issubscribed;} - void unsubscribe() {issubscribed = false;} - ready on_next(T v) { - // push v onto queue - - // under some condition pop v off of queue and pass it on - onnext(v); - - // for demo purposes - // simulate queue full every 100000 items - if (count == 100000) { - // 'queue is full' - ready no([this](ready::onthen_type ot){ - // full version will sync producer and consumer (in producer push and consumer pop) - // and decide when to restart the producer - if (!this->count) { - ot(); - } - }); - // set queue empty since the demo has no separate consumer thread - count = 0; - // 'queue is empty' - return no; - } - static const ready yes; - return yes; - } -}; -} -SCENARIO("for loop calls ready on_next(int)", "[!hide][for][asyncobserver][ready][perf]"){ - static const int& onnextcalls = static_onnextcalls; - GIVEN("a for loop"){ - WHEN("calling on_next 100 million times"){ - using namespace std::chrono; - typedef steady_clock clock; - - auto c = std::addressof(aliased); - *c = 0; - int n = 1; - auto start = clock::now(); - auto onnext = [&c](int){++*c;}; - asyncwithready::async_subscriber<int, decltype(onnext)> scbr(onnext); - asyncwithready::ready::onthen_type chunk; - int i = 0; - chunk = [&chunk, scbr, i]() mutable { - for (; i < onnextcalls && scbr.is_subscribed(); i++) { - auto controller = scbr.on_next(i); - if (!controller.is_ready()) { - controller.then(chunk); - return; - } - } - }; - chunk(); - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - std::cout << "loop ready : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; - - } - } -} - -namespace asyncwithfuture { -class unit {}; -template<class T, class OnNext> -class async_subscriber -{ -public: - OnNext onnext; - bool issubscribed; - explicit async_subscriber(OnNext on) - : onnext(on) - , issubscribed(true) - { - } - bool is_subscribed() {return issubscribed;} - void unsubscribe() {issubscribed = false;} - std::future<unit> on_next(T v) { - std::promise<unit> ready; - ready.set_value(unit()); - onnext(v); return ready.get_future();} -}; -} -SCENARIO("for loop calls std::future<unit> on_next(int)", "[!hide][for][asyncobserver][future][long][perf]"){ - const int& onnextcalls = static_onnextcalls; - GIVEN("a for loop"){ - WHEN("calling on_next 100 million times"){ - using namespace std::chrono; - typedef steady_clock clock; - - auto c = std::addressof(aliased); - *c = 0; - int n = 1; - auto start = clock::now(); - auto onnext = [&c](int){++*c;}; - asyncwithfuture::async_subscriber<int, decltype(onnext)> scbr(onnext); - for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) { - auto isready = scbr.on_next(i); - if (isready.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) { - isready.wait(); - } - } - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - std::cout << "loop future<unit> : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; - - } - } -} - -SCENARIO("for loop calls observer", "[!hide][for][observer][perf]"){ - const int& onnextcalls = static_onnextcalls; - GIVEN("a for loop"){ - WHEN("observing 100 million ints"){ - using namespace std::chrono; - typedef steady_clock clock; - - static int& c = aliased; - int n = 1; - - c = 0; - auto start = clock::now(); - auto o = rx::make_observer<int>( - [](int){++c;}, - [](rxu::error_ptr){abort();}); - for (int i = 0; i < onnextcalls; i++) { - o.on_next(i); - } - o.on_completed(); - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - std::cout << "loop -> observer : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl; - } - } -} - -SCENARIO("for loop calls subscriber", "[!hide][for][subscriber][perf]"){ - const int& onnextcalls = static_onnextcalls; - GIVEN("a for loop"){ - WHEN("observing 100 million ints"){ - using namespace std::chrono; - typedef steady_clock clock; - - static int& c = aliased; - int n = 1; - - c = 0; - auto start = clock::now(); - auto o = rx::make_subscriber<int>( - [](int){++c;}, - [](rxu::error_ptr){abort();}); - for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) { - o.on_next(i); - } - o.on_completed(); - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - std::cout << "loop -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; - } - } -} - -SCENARIO("range calls subscriber", "[!hide][range][subscriber][perf]"){ - const int& onnextcalls = static_onnextcalls; - GIVEN("a range"){ - WHEN("observing 100 million ints"){ - using namespace std::chrono; - typedef steady_clock clock; - - static int& c = aliased; - int n = 1; - - c = 0; - auto start = clock::now(); - - rxs::range<int>(1, onnextcalls).subscribe( - [](int){ - ++c; - }, - [](rxu::error_ptr){abort();}); - - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - std::cout << "range -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; - } - } -} - -SCENARIO("for loop calls subject", "[!hide][for][subject][subjects][long][perf]"){ - static const int& onnextcalls = static_onnextcalls; - GIVEN("a for loop and a subject"){ - WHEN("multicasting a million ints"){ - using namespace std::chrono; - typedef steady_clock clock; - - for (int n = 0; n < 10; n++) - { - auto p = std::make_shared<int>(0); - auto c = std::make_shared<int>(0); - rxsub::subject<int> sub; - -#if RXCPP_SUBJECT_TEST_ASYNC - std::vector<std::future<int>> f(n); - std::atomic<int> asyncUnsubscriptions{0}; -#endif - - auto o = sub.get_subscriber(); - - o.add(rx::make_subscription([c, n](){ - auto expected = n * onnextcalls; - REQUIRE(*c == expected); - })); - - for (int i = 0; i < n; i++) { -#if RXCPP_SUBJECT_TEST_ASYNC - f[i] = std::async([sub, o, &asyncUnsubscriptions]() { - auto source = sub.get_observable(); - while(o.is_subscribed()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - rx::composite_subscription cs; - source - .finally([&asyncUnsubscriptions](){ - ++asyncUnsubscriptions;}) - .subscribe( - rx::make_subscriber<int>( - cs, - [cs](int){ - cs.unsubscribe(); - }, - [](rxu::error_ptr){abort();})); - } - return 0; - }); -#endif - sub.get_observable().subscribe( - [c, p](int){ - ++(*c); - }, - [](rxu::error_ptr){abort();}); - } - - auto start = clock::now(); - for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) { -#if RXCPP_DEBUG_SUBJECT_RACE - if (*p != *c) abort(); - (*p) += n; -#endif - o.on_next(i); - } - o.on_completed(); - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - std::cout << "loop -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, "; -#if RXCPP_SUBJECT_TEST_ASYNC - std::cout << std::setw(4) << asyncUnsubscriptions << " async, "; -#endif - std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; - } - } - } -} - -SCENARIO("range calls subject", "[!hide][range][subject][subjects][long][perf]"){ - static const int& onnextcalls = static_onnextcalls; - GIVEN("a range and a subject"){ - WHEN("multicasting a million ints"){ - using namespace std::chrono; - typedef steady_clock clock; - for (int n = 0; n < 10; n++) - { - auto p = std::make_shared<int>(0); - auto c = std::make_shared<int>(0); - rxsub::subject<int> sub; - -#if RXCPP_SUBJECT_TEST_ASYNC - std::vector<std::future<int>> f(n); - std::atomic<int> asyncUnsubscriptions{0}; -#endif - - auto o = sub.get_subscriber(); - - o.add(rx::make_subscription([c, n](){ - auto expected = n * onnextcalls; - REQUIRE(*c == expected); - })); - - for (int i = 0; i < n; i++) { -#if RXCPP_SUBJECT_TEST_ASYNC - f[i] = std::async([sub, o, &asyncUnsubscriptions]() { - while(o.is_subscribed()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - rx::composite_subscription cs; - sub.get_observable() - .finally([&asyncUnsubscriptions](){ - ++asyncUnsubscriptions;}) - .subscribe(cs, - [cs](int){ - cs.unsubscribe(); - }, - [](rxu::error_ptr){abort();}); - } - return 0; - }); -#endif - sub.get_observable() - .subscribe( - [c, p](int){ - ++(*c); - }, - [](rxu::error_ptr){abort();} - ); - } - - auto start = clock::now(); - rxs::range<int>(1, onnextcalls) -#if RXCPP_DEBUG_SUBJECT_RACE - .filter([c, p, n](int){ - if (*p != *c) abort(); - (*p) += n; - return true; - }) -#endif - .subscribe(o); - auto finish = clock::now(); - auto msElapsed = duration_cast<milliseconds>(finish-start); - std::cout << "range -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, "; -#if RXCPP_SUBJECT_TEST_ASYNC - std::cout << std::setw(4) << asyncUnsubscriptions << " async, "; -#endif - std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl; - } - } - } -} - - -SCENARIO("subject - infinite source", "[subject][subjects]"){ - GIVEN("a subject and an infinite source"){ - - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - const rxsc::test::messages<bool> check; - - auto xs = sc.make_hot_observable({ - on.next(70, 1), - on.next(110, 2), - on.next(220, 3), - on.next(270, 4), - on.next(340, 5), - on.next(410, 6), - on.next(520, 7), - on.next(630, 8), - on.next(710, 9), - on.next(870, 10), - on.next(940, 11), - on.next(1020, 12) - }); - - rxsub::subject<int> s; - - auto results1 = w.make_subscriber<int>(); - - auto results2 = w.make_subscriber<int>(); - - auto results3 = w.make_subscriber<int>(); - - WHEN("multicasting an infinite source"){ - - auto checks = rxu::to_vector({ - check.next(0, false) - }); - - auto record = [&s, &check, &checks](long at) -> void { - checks.push_back(check.next(at, s.has_observers())); - }; - - auto o = s.get_subscriber(); - - w.schedule_absolute(100, [&s, &o, &checks, &record](const rxsc::schedulable&){ - s = rxsub::subject<int>(); o = s.get_subscriber(); checks.clear(); record(100);}); - w.schedule_absolute(200, [&xs, &o, &record](const rxsc::schedulable&){ - xs.subscribe(o); record(200);}); - w.schedule_absolute(1000, [&o, &record](const rxsc::schedulable&){ - o.unsubscribe(); record(1000);}); - - w.schedule_absolute(300, [&s, &results1, &record](const rxsc::schedulable&){ - s.get_observable().subscribe(results1); record(300);}); - w.schedule_absolute(400, [&s, &results2, &record](const rxsc::schedulable&){ - s.get_observable().subscribe(results2); record(400);}); - w.schedule_absolute(900, [&s, &results3, &record](const rxsc::schedulable&){ - s.get_observable().subscribe(results3); record(900);}); - - w.schedule_absolute(600, [&results1, &record](const rxsc::schedulable&){ - results1.unsubscribe(); record(600);}); - w.schedule_absolute(700, [&results2, &record](const rxsc::schedulable&){ - results2.unsubscribe(); record(700);}); - w.schedule_absolute(800, [&results1, &record](const rxsc::schedulable&){ - results1.unsubscribe(); record(800);}); - w.schedule_absolute(950, [&results3, &record](const rxsc::schedulable&){ - results3.unsubscribe(); record(950);}); - - w.start(); - - THEN("result1 contains expected messages"){ - auto required = rxu::to_vector({ - on.next(340, 5), - on.next(410, 6), - on.next(520, 7) - }); - auto actual = results1.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("result2 contains expected messages"){ - auto required = rxu::to_vector({ - on.next(410, 6), - on.next(520, 7), - on.next(630, 8) - }); - auto actual = results2.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("result3 contains expected messages"){ - auto required = rxu::to_vector({ - on.next(940, 11) - }); - auto actual = results3.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("checks contains expected messages"){ - auto required = rxu::to_vector({ - check.next(100, false), - check.next(200, false), - check.next(300, true), - check.next(400, true), - check.next(600, true), - check.next(700, false), - check.next(800, false), - check.next(900, true), - check.next(950, false), - check.next(1000, false) - }); - auto actual = checks; - REQUIRE(required == actual); - } - - } - } -} - -SCENARIO("subject - finite source", "[subject][subjects]"){ - GIVEN("a subject and an finite source"){ - - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - auto xs = sc.make_hot_observable({ - on.next(70, 1), - on.next(110, 2), - on.next(220, 3), - on.next(270, 4), - on.next(340, 5), - on.next(410, 6), - on.next(520, 7), - on.completed(630), - on.next(640, 9), - on.completed(650), - on.error(660, std::runtime_error("error on unsubscribed stream")) - }); - - rxsub::subject<int> s; - - auto results1 = w.make_subscriber<int>(); - - auto results2 = w.make_subscriber<int>(); - - auto results3 = w.make_subscriber<int>(); - - WHEN("multicasting an infinite source"){ - - auto o = s.get_subscriber(); - - w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){ - s = rxsub::subject<int>(); o = s.get_subscriber();}); - w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){ - xs.subscribe(o);}); - w.schedule_absolute(1000, [&o](const rxsc::schedulable&){ - o.unsubscribe();}); - - w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){ - s.get_observable().subscribe(results1);}); - w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){ - s.get_observable().subscribe(results2);}); - w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){ - s.get_observable().subscribe(results3);}); - - w.schedule_absolute(600, [&results1](const rxsc::schedulable&){ - results1.unsubscribe();}); - w.schedule_absolute(700, [&results2](const rxsc::schedulable&){ - results2.unsubscribe();}); - w.schedule_absolute(800, [&results1](const rxsc::schedulable&){ - results1.unsubscribe();}); - w.schedule_absolute(950, [&results3](const rxsc::schedulable&){ - results3.unsubscribe();}); - - w.start(); - - THEN("result1 contains expected messages"){ - auto required = rxu::to_vector({ - on.next(340, 5), - on.next(410, 6), - on.next(520, 7) - }); - auto actual = results1.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("result2 contains expected messages"){ - auto required = rxu::to_vector({ - on.next(410, 6), - on.next(520, 7), - on.completed(630) - }); - auto actual = results2.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("result3 contains expected messages"){ - auto required = rxu::to_vector({ - on.completed(900) - }); - auto actual = results3.get_observer().messages(); - REQUIRE(required == actual); - } - - } - } -} - - -SCENARIO("subject - on_error in source", "[subject][subjects]"){ - GIVEN("a subject and a source with an error"){ - - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - std::runtime_error ex("subject on_error in stream"); - - auto xs = sc.make_hot_observable({ - on.next(70, 1), - on.next(110, 2), - on.next(220, 3), - on.next(270, 4), - on.next(340, 5), - on.next(410, 6), - on.next(520, 7), - on.error(630, ex), - on.next(640, 9), - on.completed(650), - on.error(660, std::runtime_error("error on unsubscribed stream")) - }); - - rxsub::subject<int> s; - - auto results1 = w.make_subscriber<int>(); - - auto results2 = w.make_subscriber<int>(); - - auto results3 = w.make_subscriber<int>(); - - WHEN("multicasting an infinite source"){ - - auto o = s.get_subscriber(); - - w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){ - s = rxsub::subject<int>(); o = s.get_subscriber();}); - w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){ - xs.subscribe(o);}); - w.schedule_absolute(1000, [&o](const rxsc::schedulable&){ - o.unsubscribe();}); - - w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){ - s.get_observable().subscribe(results1);}); - w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){ - s.get_observable().subscribe(results2);}); - w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){ - s.get_observable().subscribe(results3);}); - - w.schedule_absolute(600, [&results1](const rxsc::schedulable&){ - results1.unsubscribe();}); - w.schedule_absolute(700, [&results2](const rxsc::schedulable&){ - results2.unsubscribe();}); - w.schedule_absolute(800, [&results1](const rxsc::schedulable&){ - results1.unsubscribe();}); - w.schedule_absolute(950, [&results3](const rxsc::schedulable&){ - results3.unsubscribe();}); - - w.start(); - - THEN("result1 contains expected messages"){ - auto required = rxu::to_vector({ - on.next(340, 5), - on.next(410, 6), - on.next(520, 7) - }); - auto actual = results1.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("result2 contains expected messages"){ - auto required = rxu::to_vector({ - on.next(410, 6), - on.next(520, 7), - on.error(630, ex) - }); - auto actual = results2.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("result3 contains expected messages"){ - auto required = rxu::to_vector({ - on.error(900, ex) - }); - auto actual = results3.get_observer().messages(); - REQUIRE(required == actual); - } - - } - } -} |