summaryrefslogtreecommitdiff
path: root/Rx/v2/test/subjects/subject.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/test/subjects/subject.cpp')
-rw-r--r--Rx/v2/test/subjects/subject.cpp745
1 files changed, 0 insertions, 745 deletions
diff --git a/Rx/v2/test/subjects/subject.cpp b/Rx/v2/test/subjects/subject.cpp
deleted file mode 100644
index 09318a3..0000000
--- a/Rx/v2/test/subjects/subject.cpp
+++ /dev/null
@@ -1,745 +0,0 @@
-#define RXCPP_SUBJECT_TEST_ASYNC 1
-
-#include "../test.h"
-
-#include <rxcpp/operators/rx-finally.hpp>
-
-#include <future>
-
-
-const int static_onnextcalls = 10000000;
-static int aliased = 0;
-
-SCENARIO("for loop locks mutex", "[!hide][for][mutex][long][perf]"){
- const int& onnextcalls = static_onnextcalls;
- GIVEN("a for loop"){
- WHEN("locking mutex 100 million times"){
- using namespace std::chrono;
- typedef steady_clock clock;
-
- int c = 0;
- int n = 1;
- auto start = clock::now();
- std::mutex m;
- for (int i = 0; i < onnextcalls; i++) {
- std::unique_lock<std::mutex> guard(m);
- ++c;
- }
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "loop mutex : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
-
- }
- }
-}
-
-namespace syncwithvoid {
-template<class T, class OnNext>
-class sync_subscriber
-{
-public:
- OnNext onnext;
- bool issubscribed;
- explicit sync_subscriber(OnNext on)
- : onnext(on)
- , issubscribed(true)
- {
- }
- bool is_subscribed() {return issubscribed;}
- void unsubscribe() {issubscribed = false;}
- void on_next(T v) {
- onnext(v);
- }
-};
-}
-SCENARIO("for loop calls void on_next(int)", "[!hide][for][asyncobserver][baseline][perf]"){
- const int& onnextcalls = static_onnextcalls;
- GIVEN("a for loop"){
- WHEN("calling on_next 100 million times"){
- using namespace std::chrono;
- typedef steady_clock clock;
-
- auto c = std::addressof(aliased);
- *c = 0;
- int n = 1;
- auto start = clock::now();
- auto onnext = [c](int){++*c;};
- syncwithvoid::sync_subscriber<int, decltype(onnext)> scbr(onnext);
- for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) {
- scbr.on_next(i);
- }
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "loop void : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
-
- }
- }
-}
-
-namespace asyncwithready {
-// ready is an immutable class.
-class ready
-{
-public:
- typedef std::function<void()> onthen_type;
-private:
- std::function<void(onthen_type)> setthen;
-public:
- ready() {}
- ready(std::function<void(onthen_type)> st) : setthen(st) {}
- bool is_ready() {return !setthen;}
- void then(onthen_type ot) {
- if (is_ready()) {
- abort();
- }
- setthen(ot);
- }
-};
-template<class T, class OnNext>
-class async_subscriber
-{
-public:
- OnNext onnext;
- bool issubscribed;
- int count;
- explicit async_subscriber(OnNext on)
- : onnext(on)
- , issubscribed(true)
- , count(0)
- {
- }
- bool is_subscribed() {return issubscribed;}
- void unsubscribe() {issubscribed = false;}
- ready on_next(T v) {
- // push v onto queue
-
- // under some condition pop v off of queue and pass it on
- onnext(v);
-
- // for demo purposes
- // simulate queue full every 100000 items
- if (count == 100000) {
- // 'queue is full'
- ready no([this](ready::onthen_type ot){
- // full version will sync producer and consumer (in producer push and consumer pop)
- // and decide when to restart the producer
- if (!this->count) {
- ot();
- }
- });
- // set queue empty since the demo has no separate consumer thread
- count = 0;
- // 'queue is empty'
- return no;
- }
- static const ready yes;
- return yes;
- }
-};
-}
-SCENARIO("for loop calls ready on_next(int)", "[!hide][for][asyncobserver][ready][perf]"){
- static const int& onnextcalls = static_onnextcalls;
- GIVEN("a for loop"){
- WHEN("calling on_next 100 million times"){
- using namespace std::chrono;
- typedef steady_clock clock;
-
- auto c = std::addressof(aliased);
- *c = 0;
- int n = 1;
- auto start = clock::now();
- auto onnext = [&c](int){++*c;};
- asyncwithready::async_subscriber<int, decltype(onnext)> scbr(onnext);
- asyncwithready::ready::onthen_type chunk;
- int i = 0;
- chunk = [&chunk, scbr, i]() mutable {
- for (; i < onnextcalls && scbr.is_subscribed(); i++) {
- auto controller = scbr.on_next(i);
- if (!controller.is_ready()) {
- controller.then(chunk);
- return;
- }
- }
- };
- chunk();
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "loop ready : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
-
- }
- }
-}
-
-namespace asyncwithfuture {
-class unit {};
-template<class T, class OnNext>
-class async_subscriber
-{
-public:
- OnNext onnext;
- bool issubscribed;
- explicit async_subscriber(OnNext on)
- : onnext(on)
- , issubscribed(true)
- {
- }
- bool is_subscribed() {return issubscribed;}
- void unsubscribe() {issubscribed = false;}
- std::future<unit> on_next(T v) {
- std::promise<unit> ready;
- ready.set_value(unit());
- onnext(v); return ready.get_future();}
-};
-}
-SCENARIO("for loop calls std::future<unit> on_next(int)", "[!hide][for][asyncobserver][future][long][perf]"){
- const int& onnextcalls = static_onnextcalls;
- GIVEN("a for loop"){
- WHEN("calling on_next 100 million times"){
- using namespace std::chrono;
- typedef steady_clock clock;
-
- auto c = std::addressof(aliased);
- *c = 0;
- int n = 1;
- auto start = clock::now();
- auto onnext = [&c](int){++*c;};
- asyncwithfuture::async_subscriber<int, decltype(onnext)> scbr(onnext);
- for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) {
- auto isready = scbr.on_next(i);
- if (isready.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) {
- isready.wait();
- }
- }
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "loop future<unit> : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
-
- }
- }
-}
-
-SCENARIO("for loop calls observer", "[!hide][for][observer][perf]"){
- const int& onnextcalls = static_onnextcalls;
- GIVEN("a for loop"){
- WHEN("observing 100 million ints"){
- using namespace std::chrono;
- typedef steady_clock clock;
-
- static int& c = aliased;
- int n = 1;
-
- c = 0;
- auto start = clock::now();
- auto o = rx::make_observer<int>(
- [](int){++c;},
- [](rxu::error_ptr){abort();});
- for (int i = 0; i < onnextcalls; i++) {
- o.on_next(i);
- }
- o.on_completed();
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "loop -> observer : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl;
- }
- }
-}
-
-SCENARIO("for loop calls subscriber", "[!hide][for][subscriber][perf]"){
- const int& onnextcalls = static_onnextcalls;
- GIVEN("a for loop"){
- WHEN("observing 100 million ints"){
- using namespace std::chrono;
- typedef steady_clock clock;
-
- static int& c = aliased;
- int n = 1;
-
- c = 0;
- auto start = clock::now();
- auto o = rx::make_subscriber<int>(
- [](int){++c;},
- [](rxu::error_ptr){abort();});
- for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) {
- o.on_next(i);
- }
- o.on_completed();
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "loop -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
- }
- }
-}
-
-SCENARIO("range calls subscriber", "[!hide][range][subscriber][perf]"){
- const int& onnextcalls = static_onnextcalls;
- GIVEN("a range"){
- WHEN("observing 100 million ints"){
- using namespace std::chrono;
- typedef steady_clock clock;
-
- static int& c = aliased;
- int n = 1;
-
- c = 0;
- auto start = clock::now();
-
- rxs::range<int>(1, onnextcalls).subscribe(
- [](int){
- ++c;
- },
- [](rxu::error_ptr){abort();});
-
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "range -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
- }
- }
-}
-
-SCENARIO("for loop calls subject", "[!hide][for][subject][subjects][long][perf]"){
- static const int& onnextcalls = static_onnextcalls;
- GIVEN("a for loop and a subject"){
- WHEN("multicasting a million ints"){
- using namespace std::chrono;
- typedef steady_clock clock;
-
- for (int n = 0; n < 10; n++)
- {
- auto p = std::make_shared<int>(0);
- auto c = std::make_shared<int>(0);
- rxsub::subject<int> sub;
-
-#if RXCPP_SUBJECT_TEST_ASYNC
- std::vector<std::future<int>> f(n);
- std::atomic<int> asyncUnsubscriptions{0};
-#endif
-
- auto o = sub.get_subscriber();
-
- o.add(rx::make_subscription([c, n](){
- auto expected = n * onnextcalls;
- REQUIRE(*c == expected);
- }));
-
- for (int i = 0; i < n; i++) {
-#if RXCPP_SUBJECT_TEST_ASYNC
- f[i] = std::async([sub, o, &asyncUnsubscriptions]() {
- auto source = sub.get_observable();
- while(o.is_subscribed()) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- rx::composite_subscription cs;
- source
- .finally([&asyncUnsubscriptions](){
- ++asyncUnsubscriptions;})
- .subscribe(
- rx::make_subscriber<int>(
- cs,
- [cs](int){
- cs.unsubscribe();
- },
- [](rxu::error_ptr){abort();}));
- }
- return 0;
- });
-#endif
- sub.get_observable().subscribe(
- [c, p](int){
- ++(*c);
- },
- [](rxu::error_ptr){abort();});
- }
-
- auto start = clock::now();
- for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) {
-#if RXCPP_DEBUG_SUBJECT_RACE
- if (*p != *c) abort();
- (*p) += n;
-#endif
- o.on_next(i);
- }
- o.on_completed();
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "loop -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, ";
-#if RXCPP_SUBJECT_TEST_ASYNC
- std::cout << std::setw(4) << asyncUnsubscriptions << " async, ";
-#endif
- std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
- }
- }
- }
-}
-
-SCENARIO("range calls subject", "[!hide][range][subject][subjects][long][perf]"){
- static const int& onnextcalls = static_onnextcalls;
- GIVEN("a range and a subject"){
- WHEN("multicasting a million ints"){
- using namespace std::chrono;
- typedef steady_clock clock;
- for (int n = 0; n < 10; n++)
- {
- auto p = std::make_shared<int>(0);
- auto c = std::make_shared<int>(0);
- rxsub::subject<int> sub;
-
-#if RXCPP_SUBJECT_TEST_ASYNC
- std::vector<std::future<int>> f(n);
- std::atomic<int> asyncUnsubscriptions{0};
-#endif
-
- auto o = sub.get_subscriber();
-
- o.add(rx::make_subscription([c, n](){
- auto expected = n * onnextcalls;
- REQUIRE(*c == expected);
- }));
-
- for (int i = 0; i < n; i++) {
-#if RXCPP_SUBJECT_TEST_ASYNC
- f[i] = std::async([sub, o, &asyncUnsubscriptions]() {
- while(o.is_subscribed()) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- rx::composite_subscription cs;
- sub.get_observable()
- .finally([&asyncUnsubscriptions](){
- ++asyncUnsubscriptions;})
- .subscribe(cs,
- [cs](int){
- cs.unsubscribe();
- },
- [](rxu::error_ptr){abort();});
- }
- return 0;
- });
-#endif
- sub.get_observable()
- .subscribe(
- [c, p](int){
- ++(*c);
- },
- [](rxu::error_ptr){abort();}
- );
- }
-
- auto start = clock::now();
- rxs::range<int>(1, onnextcalls)
-#if RXCPP_DEBUG_SUBJECT_RACE
- .filter([c, p, n](int){
- if (*p != *c) abort();
- (*p) += n;
- return true;
- })
-#endif
- .subscribe(o);
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- std::cout << "range -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, ";
-#if RXCPP_SUBJECT_TEST_ASYNC
- std::cout << std::setw(4) << asyncUnsubscriptions << " async, ";
-#endif
- std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl;
- }
- }
- }
-}
-
-
-SCENARIO("subject - infinite source", "[subject][subjects]"){
- GIVEN("a subject and an infinite source"){
-
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
- const rxsc::test::messages<bool> check;
-
- auto xs = sc.make_hot_observable({
- on.next(70, 1),
- on.next(110, 2),
- on.next(220, 3),
- on.next(270, 4),
- on.next(340, 5),
- on.next(410, 6),
- on.next(520, 7),
- on.next(630, 8),
- on.next(710, 9),
- on.next(870, 10),
- on.next(940, 11),
- on.next(1020, 12)
- });
-
- rxsub::subject<int> s;
-
- auto results1 = w.make_subscriber<int>();
-
- auto results2 = w.make_subscriber<int>();
-
- auto results3 = w.make_subscriber<int>();
-
- WHEN("multicasting an infinite source"){
-
- auto checks = rxu::to_vector({
- check.next(0, false)
- });
-
- auto record = [&s, &check, &checks](long at) -> void {
- checks.push_back(check.next(at, s.has_observers()));
- };
-
- auto o = s.get_subscriber();
-
- w.schedule_absolute(100, [&s, &o, &checks, &record](const rxsc::schedulable&){
- s = rxsub::subject<int>(); o = s.get_subscriber(); checks.clear(); record(100);});
- w.schedule_absolute(200, [&xs, &o, &record](const rxsc::schedulable&){
- xs.subscribe(o); record(200);});
- w.schedule_absolute(1000, [&o, &record](const rxsc::schedulable&){
- o.unsubscribe(); record(1000);});
-
- w.schedule_absolute(300, [&s, &results1, &record](const rxsc::schedulable&){
- s.get_observable().subscribe(results1); record(300);});
- w.schedule_absolute(400, [&s, &results2, &record](const rxsc::schedulable&){
- s.get_observable().subscribe(results2); record(400);});
- w.schedule_absolute(900, [&s, &results3, &record](const rxsc::schedulable&){
- s.get_observable().subscribe(results3); record(900);});
-
- w.schedule_absolute(600, [&results1, &record](const rxsc::schedulable&){
- results1.unsubscribe(); record(600);});
- w.schedule_absolute(700, [&results2, &record](const rxsc::schedulable&){
- results2.unsubscribe(); record(700);});
- w.schedule_absolute(800, [&results1, &record](const rxsc::schedulable&){
- results1.unsubscribe(); record(800);});
- w.schedule_absolute(950, [&results3, &record](const rxsc::schedulable&){
- results3.unsubscribe(); record(950);});
-
- w.start();
-
- THEN("result1 contains expected messages"){
- auto required = rxu::to_vector({
- on.next(340, 5),
- on.next(410, 6),
- on.next(520, 7)
- });
- auto actual = results1.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("result2 contains expected messages"){
- auto required = rxu::to_vector({
- on.next(410, 6),
- on.next(520, 7),
- on.next(630, 8)
- });
- auto actual = results2.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("result3 contains expected messages"){
- auto required = rxu::to_vector({
- on.next(940, 11)
- });
- auto actual = results3.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("checks contains expected messages"){
- auto required = rxu::to_vector({
- check.next(100, false),
- check.next(200, false),
- check.next(300, true),
- check.next(400, true),
- check.next(600, true),
- check.next(700, false),
- check.next(800, false),
- check.next(900, true),
- check.next(950, false),
- check.next(1000, false)
- });
- auto actual = checks;
- REQUIRE(required == actual);
- }
-
- }
- }
-}
-
-SCENARIO("subject - finite source", "[subject][subjects]"){
- GIVEN("a subject and an finite source"){
-
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- auto xs = sc.make_hot_observable({
- on.next(70, 1),
- on.next(110, 2),
- on.next(220, 3),
- on.next(270, 4),
- on.next(340, 5),
- on.next(410, 6),
- on.next(520, 7),
- on.completed(630),
- on.next(640, 9),
- on.completed(650),
- on.error(660, std::runtime_error("error on unsubscribed stream"))
- });
-
- rxsub::subject<int> s;
-
- auto results1 = w.make_subscriber<int>();
-
- auto results2 = w.make_subscriber<int>();
-
- auto results3 = w.make_subscriber<int>();
-
- WHEN("multicasting an infinite source"){
-
- auto o = s.get_subscriber();
-
- w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){
- s = rxsub::subject<int>(); o = s.get_subscriber();});
- w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){
- xs.subscribe(o);});
- w.schedule_absolute(1000, [&o](const rxsc::schedulable&){
- o.unsubscribe();});
-
- w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){
- s.get_observable().subscribe(results1);});
- w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){
- s.get_observable().subscribe(results2);});
- w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){
- s.get_observable().subscribe(results3);});
-
- w.schedule_absolute(600, [&results1](const rxsc::schedulable&){
- results1.unsubscribe();});
- w.schedule_absolute(700, [&results2](const rxsc::schedulable&){
- results2.unsubscribe();});
- w.schedule_absolute(800, [&results1](const rxsc::schedulable&){
- results1.unsubscribe();});
- w.schedule_absolute(950, [&results3](const rxsc::schedulable&){
- results3.unsubscribe();});
-
- w.start();
-
- THEN("result1 contains expected messages"){
- auto required = rxu::to_vector({
- on.next(340, 5),
- on.next(410, 6),
- on.next(520, 7)
- });
- auto actual = results1.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("result2 contains expected messages"){
- auto required = rxu::to_vector({
- on.next(410, 6),
- on.next(520, 7),
- on.completed(630)
- });
- auto actual = results2.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("result3 contains expected messages"){
- auto required = rxu::to_vector({
- on.completed(900)
- });
- auto actual = results3.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- }
- }
-}
-
-
-SCENARIO("subject - on_error in source", "[subject][subjects]"){
- GIVEN("a subject and a source with an error"){
-
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- std::runtime_error ex("subject on_error in stream");
-
- auto xs = sc.make_hot_observable({
- on.next(70, 1),
- on.next(110, 2),
- on.next(220, 3),
- on.next(270, 4),
- on.next(340, 5),
- on.next(410, 6),
- on.next(520, 7),
- on.error(630, ex),
- on.next(640, 9),
- on.completed(650),
- on.error(660, std::runtime_error("error on unsubscribed stream"))
- });
-
- rxsub::subject<int> s;
-
- auto results1 = w.make_subscriber<int>();
-
- auto results2 = w.make_subscriber<int>();
-
- auto results3 = w.make_subscriber<int>();
-
- WHEN("multicasting an infinite source"){
-
- auto o = s.get_subscriber();
-
- w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){
- s = rxsub::subject<int>(); o = s.get_subscriber();});
- w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){
- xs.subscribe(o);});
- w.schedule_absolute(1000, [&o](const rxsc::schedulable&){
- o.unsubscribe();});
-
- w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){
- s.get_observable().subscribe(results1);});
- w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){
- s.get_observable().subscribe(results2);});
- w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){
- s.get_observable().subscribe(results3);});
-
- w.schedule_absolute(600, [&results1](const rxsc::schedulable&){
- results1.unsubscribe();});
- w.schedule_absolute(700, [&results2](const rxsc::schedulable&){
- results2.unsubscribe();});
- w.schedule_absolute(800, [&results1](const rxsc::schedulable&){
- results1.unsubscribe();});
- w.schedule_absolute(950, [&results3](const rxsc::schedulable&){
- results3.unsubscribe();});
-
- w.start();
-
- THEN("result1 contains expected messages"){
- auto required = rxu::to_vector({
- on.next(340, 5),
- on.next(410, 6),
- on.next(520, 7)
- });
- auto actual = results1.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("result2 contains expected messages"){
- auto required = rxu::to_vector({
- on.next(410, 6),
- on.next(520, 7),
- on.error(630, ex)
- });
- auto actual = results2.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("result3 contains expected messages"){
- auto required = rxu::to_vector({
- on.error(900, ex)
- });
- auto actual = results3.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- }
- }
-}