summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIgor Murashkin <iam@google.com>2019-02-20 17:14:57 -0800
committerandroid-build-merger <android-build-merger@google.com>2019-02-20 17:14:57 -0800
commit22269e188318fe4fe6f97b7b09dea1562d954858 (patch)
tree391d723483a8ca67aaea9245bf8ce98cbe52c216
parentc6ee6b26d930e1e139d9ab2e0b1706d25a109543 (diff)
parent8ca4201d6786bd519d212f8ca076ca943c50bdbd (diff)
downloadRxCpp-22269e188318fe4fe6f97b7b09dea1562d954858.tar.gz
operators: Add ref_count(other) operator overload. am: 671e288a8a am: b03d6248d9
am: 8ca4201d67 Change-Id: I511597fbf53554f316eeb63cc52de72af8d38675
-rw-r--r--Rx/v2/examples/doxygen/publish.cpp95
-rw-r--r--Rx/v2/examples/doxygen/ref_count.cpp55
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-publish.hpp12
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-ref_count.hpp134
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp11
-rw-r--r--Rx/v2/test/operators/publish.cpp100
-rw-r--r--projects/doxygen/CMakeLists.txt1
7 files changed, 393 insertions, 15 deletions
diff --git a/Rx/v2/examples/doxygen/publish.cpp b/Rx/v2/examples/doxygen/publish.cpp
index d34e991..6c348a2 100644
--- a/Rx/v2/examples/doxygen/publish.cpp
+++ b/Rx/v2/examples/doxygen/publish.cpp
@@ -3,6 +3,9 @@
#include "rxcpp/rx-test.hpp"
#include "catch.hpp"
+#include <atomic>
+#include <array>
+
SCENARIO("publish_synchronized sample"){
printf("//! [publish_synchronized sample]\n");
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
@@ -95,3 +98,95 @@ SCENARIO("publish behavior sample"){
values.as_blocking().subscribe();
printf("//! [publish behavior sample]\n");
}
+
+SCENARIO("publish diamond bgthread sample"){
+ printf("//! [publish diamond bgthread sample]\n");
+
+ /*
+ * Implements the following diamond graph chain with publish+connect on a background thread.
+ *
+ * Values
+ * / \
+ * *2 *100
+ * \ /
+ * Merge
+ */
+ auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
+ take(5).
+ publish();
+
+ // Left side multiplies by 2.
+ auto left = values.map(
+ [](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} );
+
+ // Right side multiplies by 100.
+ auto right = values.map(
+ [](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; });
+
+ // Merge the left,right sides together.
+ // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
+ auto merged = left.merge(right);
+
+ std::atomic<bool> completed{false};
+
+ // Add subscription to see results
+ merged.subscribe(
+ [](long v) { printf("[3] OnNext: %ld\n", v); },
+ [&]() { printf("[3] OnCompleted:\n"); completed = true; });
+
+ // Start emitting
+ values.connect();
+
+ // Block until subscription terminates.
+ while (!completed) {}
+
+ // Note: consider using ref_count(other) in real code, it's more composable.
+
+ printf("//! [publish diamond bgthread sample]\n");
+}
+
+SCENARIO("publish diamond samethread sample"){
+ printf("//! [publish diamond samethread sample]\n");
+
+ /*
+ * Implements the following diamond graph chain with publish+connect diamond without using threads.
+ *
+ * Values
+ * / \
+ * *2 *100
+ * \ /
+ * Merge
+ */
+
+ std::array<int, 5> a={{1, 2, 3, 4, 5}};
+ auto values = rxcpp::observable<>::iterate(a).
+ publish();
+
+ // Left side multiplies by 2.
+ auto left = values.map(
+ [](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} );
+
+ // Right side multiplies by 100.
+ auto right = values.map(
+ [](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; });
+
+ // Merge the left,right sides together.
+ // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
+ auto merged = left.merge(right);
+
+ // Add subscription to see results
+ merged.subscribe(
+ [](long v) { printf("[3] OnNext: %ld\n", v); },
+ [&]() { printf("[3] OnCompleted:\n"); });
+
+ // Start emitting
+ // - because there are no other threads here, the connect call blocks until the source
+ // calls on_completed.
+ values.connect();
+
+ // Note: consider using ref_count(other) in real code, it's more composable.
+
+ printf("//! [publish diamond samethread sample]\n");
+}
+
+// see also examples/doxygen/ref_count.cpp for more diamond examples
diff --git a/Rx/v2/examples/doxygen/ref_count.cpp b/Rx/v2/examples/doxygen/ref_count.cpp
new file mode 100644
index 0000000..d056274
--- /dev/null
+++ b/Rx/v2/examples/doxygen/ref_count.cpp
@@ -0,0 +1,55 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+#include <array>
+
+SCENARIO("ref_count other diamond sample"){
+ printf("//! [ref_count other diamond sample]\n");
+
+ /*
+ * Implements the following diamond graph chain with publish+ref_count without using threads.
+ * This version is composable because it does not use connect explicitly.
+ *
+ * Values
+ * / \
+ * *2 *100
+ * \ /
+ * Merge
+ * |
+ * RefCount
+ */
+
+ std::array<double, 5> a={{1.0, 2.0, 3.0, 4.0, 5.0}};
+ auto values = rxcpp::observable<>::iterate(a)
+ // The root of the chain is only subscribed to once.
+ .tap([](double v) { printf("[0] OnNext: %lf\n", v); })
+ .publish();
+
+ auto values_to_long = values.map([](double v) { return (long) v; });
+
+ // Left side multiplies by 2.
+ auto left = values_to_long.map(
+ [](long v) -> long {printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2L;} );
+
+ // Right side multiplies by 100.
+ auto right = values_to_long.map(
+ [](long v) -> long {printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100L; });
+
+ // Merge the left,right sides together.
+ // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
+ auto merged = left.merge(right);
+
+ // When this value is subscribed to, it calls connect on values.
+ auto connect_on_subscribe = merged.ref_count(values);
+
+ // This immediately starts emitting all values and blocks until they are completed.
+ connect_on_subscribe.subscribe(
+ [](long v) { printf("[3] OnNext: %ld\n", v); },
+ [&]() { printf("[3] OnCompleted:\n"); });
+
+ printf("//! [ref_count other diamond sample]\n");
+}
+
+// see also examples/doxygen/publish.cpp for non-ref_count diamonds
diff --git a/Rx/v2/src/rxcpp/operators/rx-publish.hpp b/Rx/v2/src/rxcpp/operators/rx-publish.hpp
index dc38191..bc686fc 100644
--- a/Rx/v2/src/rxcpp/operators/rx-publish.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-publish.hpp
@@ -21,6 +21,18 @@
\sample
\snippet publish.cpp publish behavior sample
\snippet output.txt publish behavior sample
+
+ \sample
+ \snippet publish.cpp publish diamond samethread sample
+ \snippet output.txt publish diamond samethread sample
+
+ \sample
+ \snippet publish.cpp publish diamond bgthread sample
+ \snippet output.txt publish diamond bgthread sample
+
+ \sample
+ \snippet ref_count.cpp ref_count other diamond sample
+ \snippet output.txt ref_count other diamond sample
*/
#if !defined(RXCPP_OPERATORS_RX_PUBLISH_HPP)
diff --git a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
index 55dde05..b68315d 100644
--- a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
@@ -4,10 +4,26 @@
/*! \file rx-ref_count.hpp
- \brief takes a connectable_observable source and uses a ref_count of the subscribers to control the connection to the published source.
- The first subscription will cause a call to connect() and the last unsubscribe will unsubscribe the connection.
+ \brief Make some \c connectable_observable behave like an ordinary \c observable.
+ Uses a reference count of the subscribers to control the connection to the published observable.
- \return An observable that emitting the items from its source.
+ The first subscription will cause a call to \c connect(), and the last \c unsubscribe will unsubscribe the connection.
+
+ There are 2 variants of the operator:
+ \li \c ref_count(): calls \c connect on the \c source \c connectable_observable.
+ \li \c ref_count(other): calls \c connect on the \c other \c connectable_observable.
+
+ \tparam ConnectableObservable the type of the \c other \c connectable_observable (optional)
+ \param other \c connectable_observable to call \c connect on (optional)
+
+ If \c other is omitted, then \c source is used instead (which must be a \c connectable_observable).
+ Otherwise, \c source can be a regular \c observable.
+
+ \return An \c observable that emits the items from its \c source.
+
+ \sample
+ \snippet ref_count.cpp ref_count other diamond sample
+ \snippet output.txt ref_count other diamond sample
*/
#if !defined(RXCPP_OPERATORS_RX_REF_COUNT_HPP)
@@ -30,29 +46,100 @@ struct ref_count_invalid : public rxo::operator_base<ref_count_invalid_arguments
};
template<class... AN>
using ref_count_invalid_t = typename ref_count_invalid<AN...>::type;
-
-template<class T, class ConnectableObservable>
+
+// ref_count(other) takes a regular observable source, not a connectable_observable.
+// use template specialization to avoid instantiating 'subscribe' for two different types
+// which would cause a compilation error.
+template <typename connectable_type, typename observable_type>
+struct ref_count_state_base {
+ ref_count_state_base(connectable_type other, observable_type source)
+ : connectable(std::move(other))
+ , subscribable(std::move(source)) {}
+
+ connectable_type connectable; // connects to this. subscribes to this if subscribable empty.
+ observable_type subscribable; // subscribes to this if non-empty.
+
+ template <typename Subscriber>
+ void subscribe(Subscriber&& o) {
+ subscribable.subscribe(std::forward<Subscriber>(o));
+ }
+};
+
+// Note: explicit specializations have to be at namespace scope prior to C++17.
+template <typename connectable_type>
+struct ref_count_state_base<connectable_type, void> {
+ explicit ref_count_state_base(connectable_type c)
+ : connectable(std::move(c)) {}
+
+ connectable_type connectable; // connects to this. subscribes to this if subscribable empty.
+
+ template <typename Subscriber>
+ void subscribe(Subscriber&& o) {
+ connectable.subscribe(std::forward<Subscriber>(o));
+ }
+};
+
+template<class T,
+ class ConnectableObservable,
+ class Observable = void> // note: type order flipped versus the operator.
struct ref_count : public operator_base<T>
{
- typedef rxu::decay_t<ConnectableObservable> source_type;
+ typedef rxu::decay_t<Observable> observable_type;
+ typedef rxu::decay_t<ConnectableObservable> connectable_type;
- struct ref_count_state : public std::enable_shared_from_this<ref_count_state>
+ // ref_count() == false
+ // ref_count(other) == true
+ using has_observable_t = rxu::negation<std::is_same<void, Observable>>;
+ static constexpr bool has_observable_v = has_observable_t::value;
+
+ struct ref_count_state : public std::enable_shared_from_this<ref_count_state>,
+ public ref_count_state_base<ConnectableObservable, Observable>
{
- explicit ref_count_state(source_type o)
- : source(std::move(o))
+ template <class HasObservable = has_observable_t,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ rxu::negation<HasObservable>>>
+ explicit ref_count_state(connectable_type source)
+ : ref_count_state_base<ConnectableObservable, Observable>(std::move(source))
+ , subscribers(0)
+ {
+ }
+
+ template <bool HasObservableV = has_observable_v>
+ ref_count_state(connectable_type other,
+ typename std::enable_if<HasObservableV, observable_type>::type source)
+ : ref_count_state_base<ConnectableObservable, Observable>(std::move(other),
+ std::move(source))
, subscribers(0)
{
}
- source_type source;
std::mutex lock;
long subscribers;
composite_subscription connection;
};
std::shared_ptr<ref_count_state> state;
- explicit ref_count(source_type o)
- : state(std::make_shared<ref_count_state>(std::move(o)))
+ // connectable_observable<T> source = ...;
+ // source.ref_count();
+ //
+ // calls connect on source after the subscribe on source.
+ template <class HasObservable = has_observable_t,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ rxu::negation<HasObservable>>>
+ explicit ref_count(connectable_type source)
+ : state(std::make_shared<ref_count_state>(std::move(source)))
+ {
+ }
+
+ // connectable_observable<?> other = ...;
+ // observable<T> source = ...;
+ // source.ref_count(other);
+ //
+ // calls connect on 'other' after the subscribe on 'source'.
+ template <bool HasObservableV = has_observable_v>
+ ref_count(connectable_type other,
+ typename std::enable_if<HasObservableV, observable_type>::type source)
+ : state(std::make_shared<ref_count_state>(std::move(other), std::move(source)))
{
}
@@ -70,9 +157,9 @@ struct ref_count : public operator_base<T>
keepAlive->connection = composite_subscription();
}
});
- keepAlive->source.subscribe(std::forward<Subscriber>(o));
+ keepAlive->subscribe(std::forward<Subscriber>(o));
if (needConnect) {
- keepAlive->source.connect(keepAlive->connection);
+ keepAlive->connectable.connect(keepAlive->connection);
}
}
};
@@ -104,11 +191,28 @@ struct member_overload<ref_count_tag>
return Result(RefCount(std::forward<ConnectableObservable>(o)));
}
+ template<class Observable,
+ class ConnectableObservable,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>,
+ is_connectable_observable<ConnectableObservable>>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class RefCount = rxo::detail::ref_count<SourceValue,
+ rxu::decay_t<ConnectableObservable>,
+ rxu::decay_t<Observable>>,
+ class Value = rxu::value_type_t<RefCount>,
+ class Result = observable<Value, RefCount>
+ >
+ static Result member(Observable&& o, ConnectableObservable&& other) {
+ return Result(RefCount(std::forward<ConnectableObservable>(other),
+ std::forward<Observable>(o)));
+ }
+
template<class... AN>
static operators::detail::ref_count_invalid_t<AN...> member(AN...) {
std::terminate();
return {};
- static_assert(sizeof...(AN) == 10000, "ref_count takes no arguments");
+ static_assert(sizeof...(AN) == 10000, "ref_count takes (optional ConnectableObservable)");
}
};
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 97bcabd..4f42007 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -574,6 +574,17 @@ public:
static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
}
+ /*! @copydoc rx-ref_count.hpp
+ */
+ template<class... AN>
+ auto ref_count(AN... an) const // ref_count(ConnectableObservable&&)
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
+ /// \endcond
+ {
+ return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...);
+ }
+
/*! @copydoc rxcpp::operators::as_blocking
*/
template<class... AN>
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp
index c977597..fe578d7 100644
--- a/Rx/v2/test/operators/publish.cpp
+++ b/Rx/v2/test/operators/publish.cpp
@@ -2,6 +2,8 @@
#include <rxcpp/operators/rx-publish.hpp>
#include <rxcpp/operators/rx-connect_forever.hpp>
#include <rxcpp/operators/rx-ref_count.hpp>
+#include <rxcpp/operators/rx-map.hpp>
+#include <rxcpp/operators/rx-merge.hpp>
SCENARIO("publish range", "[!hide][range][subject][publish][subject][operators]"){
@@ -38,6 +40,104 @@ SCENARIO("publish range", "[!hide][range][subject][publish][subject][operators]"
}
}
+SCENARIO("publish ref_count", "[range][subject][publish][ref_count][operators]"){
+ GIVEN("a range"){
+ WHEN("ref_count is used"){
+ auto published = rxs::range<int>(0, 3).publish().ref_count();
+
+ std::vector<int> results;
+ published.subscribe(
+ // on_next
+ [&](int v){
+ results.push_back(v);
+ },
+ // on_completed
+ [](){});
+
+ std::vector<int> expected_results;
+ expected_results.push_back(0);
+ expected_results.push_back(1);
+ expected_results.push_back(2);
+ expected_results.push_back(3);
+
+ CHECK(results == expected_results);
+ }
+ WHEN("ref_count(other) is used"){
+ auto published = rxs::range<double>(0, 10).publish();
+ auto map_to_int = published.map([](double v) { return (long) v; });
+
+ // Ensures that 'ref_count(other)' has the source value type,
+ // not the publisher's value type.
+ auto with_ref_count = map_to_int.ref_count(published);
+
+ std::vector<long> results;
+
+ with_ref_count.subscribe(
+ // on_next
+ [&](long v){
+ results.push_back(v);
+ },
+ // on_completed
+ [](){});
+
+ std::vector<long> expected_results;
+ for (long i = 0; i <= 10; ++i) {
+ expected_results.push_back(i);
+ }
+ CHECK(results == expected_results);
+ }
+ WHEN("ref_count(other) is used in a diamond"){
+ auto source = rxs::range<double>(0, 3);
+
+ int published_on_next_count = 0;
+ // Ensure we only subscribe once to 'published' when its in a diamond.
+ auto next = source.map(
+ [&](double v) {
+ published_on_next_count++;
+ return v;
+ }
+ );
+ auto published = next.publish();
+
+ // Ensures that 'x.ref_count(other)' has the 'x' value type, not the other's value
+ // type.
+ auto map_to_int = published.map([](double v) { return (long) v; });
+
+ auto left = map_to_int.map([](long v) { return v * 2; });
+ auto right = map_to_int.map([](long v) { return v * 100; });
+
+ auto merge = left.merge(right);
+ auto with_ref_count = merge.ref_count(published);
+
+ std::vector<long> results;
+
+ with_ref_count.subscribe(
+ // on_next
+ [&](long v){
+ results.push_back(v);
+ },
+ // on_completed
+ [](){});
+
+ // Ensure we only subscribe once to 'published' when its in a diamond.
+ CHECK(published_on_next_count == 4);
+
+ std::vector<long> expected_results;
+ expected_results.push_back(0);
+ expected_results.push_back(0);
+ expected_results.push_back(2);
+ expected_results.push_back(100);
+ expected_results.push_back(4);
+ expected_results.push_back(200);
+ expected_results.push_back(6);
+ expected_results.push_back(300);
+
+ // Ensure left,right is interleaved without being biased towards one side.
+ CHECK(results == expected_results);
+ }
+ }
+}
+
SCENARIO("publish basic", "[publish][multicast][subject][operators]"){
GIVEN("a test hot observable of longs"){
auto sc = rxsc::make_test();
diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt
index 6620a7a..1c1ba4c 100644
--- a/projects/doxygen/CMakeLists.txt
+++ b/projects/doxygen/CMakeLists.txt
@@ -84,6 +84,7 @@ if(DOXYGEN_FOUND)
${DOXY_EXAMPLES_SRC_DIR}/publish.cpp
${DOXY_EXAMPLES_SRC_DIR}/range.cpp
${DOXY_EXAMPLES_SRC_DIR}/reduce.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/ref_count.cpp
${DOXY_EXAMPLES_SRC_DIR}/repeat.cpp
${DOXY_EXAMPLES_SRC_DIR}/replay.cpp
${DOXY_EXAMPLES_SRC_DIR}/retry.cpp