summaryrefslogtreecommitdiff
path: root/Rx/v2/test/operators/subscribe_on.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/test/operators/subscribe_on.cpp')
-rw-r--r--Rx/v2/test/operators/subscribe_on.cpp180
1 files changed, 0 insertions, 180 deletions
diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp
deleted file mode 100644
index ef8a8c7..0000000
--- a/Rx/v2/test/operators/subscribe_on.cpp
+++ /dev/null
@@ -1,180 +0,0 @@
-#include "../test.h"
-#include <rxcpp/operators/rx-reduce.hpp>
-#include <rxcpp/operators/rx-map.hpp>
-#include <rxcpp/operators/rx-subscribe_on.hpp>
-#include <rxcpp/operators/rx-observe_on.hpp>
-
-#include <sstream>
-
-static const int static_subscriptions = 50000;
-
-SCENARIO("for loop subscribes to map with subscribe_on and observe_on", "[!hide][for][just][subscribe][subscribe_on][observe_on][long][perf]"){
- const int& subscriptions = static_subscriptions;
- GIVEN("a for loop"){
- WHEN("subscribe 50K times"){
- using namespace std::chrono;
- typedef steady_clock clock;
-
- int runs = 10;
-
- for (;runs > 0; --runs) {
-
- int c = 0;
- int n = 1;
- auto start = clock::now();
- for (int i = 0; i < subscriptions; ++i) {
- c += rx::observable<>::just(1)
- .map([](int i) {
- std::stringstream serializer;
- serializer << i;
- return serializer.str();
- })
- .map([](const std::string& s) {
- int i;
- std::stringstream(s) >> i;
- return i;
- })
- .subscribe_on(rx::observe_on_event_loop())
- .observe_on(rx::observe_on_event_loop())
- .as_blocking()
- .count();
- }
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- REQUIRE(subscriptions == c);
- std::cout << "loop subscribe map subscribe_on observe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
- }
- }
- }
-}
-
-SCENARIO("for loop subscribes to map with subscribe_on", "[!hide][subscribe_on_only][for][just][subscribe][subscribe_on][long][perf]"){
- const int& subscriptions = static_subscriptions;
- GIVEN("a for loop"){
- WHEN("subscribe 50K times"){
- using namespace std::chrono;
- typedef steady_clock clock;
-
- int runs = 10;
-
- for (;runs > 0; --runs) {
-
- int c = 0;
- int n = 1;
- auto start = clock::now();
-
- for (int i = 0; i < subscriptions; ++i) {
- c += rx::observable<>::
- just(1).
- map([](int i) {
- std::stringstream serializer;
- serializer << i;
- return serializer.str();
- }).
- map([](const std::string& s) {
- int i;
- std::stringstream(s) >> i;
- return i;
- }).
- subscribe_on(rx::observe_on_event_loop()).
- as_blocking().
- count();
- }
- auto finish = clock::now();
- auto msElapsed = duration_cast<milliseconds>(finish-start);
- REQUIRE(subscriptions == c);
- std::cout << "loop subscribe map subscribe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
- }
- }
- }
-}
-
-SCENARIO("subscribe_on", "[subscribe][subscribe_on]"){
- GIVEN("a source"){
- auto sc = rxsc::make_test();
- auto so = rx::synchronize_in_one_worker(sc);
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- auto xs = sc.make_hot_observable({
- on.next(150, 1),
- on.next(210, 2),
- on.next(240, 3),
- on.completed(300)
- });
-
- WHEN("subscribe_on is specified"){
-
- auto res = w.start(
- [so, xs]() {
- return xs
- .subscribe_on(so);
- }
- );
-
- THEN("the output contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(210, 2),
- on.next(240, 3),
- on.completed(300)
- });
- auto actual = res.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there was 1 subscription/unsubscription to the source"){
- auto required = rxu::to_vector({
- on.subscribe(201, 300)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- }
- }
-}
-
-SCENARIO("stream subscribe_on", "[subscribe][subscribe_on]"){
- GIVEN("a source"){
- auto sc = rxsc::make_test();
- auto so = rx::synchronize_in_one_worker(sc);
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- auto xs = sc.make_hot_observable({
- on.next(150, 1),
- on.next(210, 2),
- on.next(240, 3),
- on.completed(300)
- });
-
- WHEN("subscribe_on is specified"){
-
- auto res = w.start(
- [so, xs]() {
- return xs
- | rxo::subscribe_on(so);
- }
- );
-
- THEN("the output contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(210, 2),
- on.next(240, 3),
- on.completed(300)
- });
- auto actual = res.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there was 1 subscription/unsubscription to the source"){
- auto required = rxu::to_vector({
- on.subscribe(201, 300)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- }
- }
-}