summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/rx-connectable_observable.hpp')
-rw-r--r--Rx/v2/src/rxcpp/rx-connectable_observable.hpp211
1 files changed, 0 insertions, 211 deletions
diff --git a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp b/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
deleted file mode 100644
index 7038e24..0000000
--- a/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
+++ /dev/null
@@ -1,211 +0,0 @@
-// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
-
-#pragma once
-
-#if !defined(RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP)
-#define RXCPP_RX_CONNECTABLE_OBSERVABLE_HPP
-
-#include "rx-includes.hpp"
-
-namespace rxcpp {
-
-namespace detail {
-
-template<class T>
-struct has_on_connect
-{
- struct not_void {};
- template<class CT>
- static auto check(int) -> decltype((*(CT*)nullptr).on_connect(composite_subscription()));
- template<class CT>
- static not_void check(...);
-
- typedef decltype(check<T>(0)) detail_result;
- static const bool value = std::is_same<detail_result, void>::value;
-};
-
-}
-
-template<class T>
-class dynamic_connectable_observable
- : public dynamic_observable<T>
-{
- struct state_type
- : public std::enable_shared_from_this<state_type>
- {
- typedef std::function<void(composite_subscription)> onconnect_type;
-
- onconnect_type on_connect;
- };
- std::shared_ptr<state_type> state;
-
- template<class U>
- void construct(const dynamic_observable<U>& o, tag_dynamic_observable&&) {
- state = o.state;
- }
-
- template<class U>
- void construct(dynamic_observable<U>&& o, tag_dynamic_observable&&) {
- state = std::move(o.state);
- }
-
- template<class SO>
- void construct(SO&& source, rxs::tag_source&&) {
- auto so = std::make_shared<rxu::decay_t<SO>>(std::forward<SO>(source));
- state->on_connect = [so](composite_subscription cs) mutable {
- so->on_connect(std::move(cs));
- };
- }
-
-public:
-
- typedef tag_dynamic_observable dynamic_observable_tag;
-
- dynamic_connectable_observable()
- {
- }
-
- template<class SOF>
- explicit dynamic_connectable_observable(SOF sof)
- : dynamic_observable<T>(sof)
- , state(std::make_shared<state_type>())
- {
- construct(std::move(sof),
- typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type());
- }
-
- template<class SF, class CF>
- dynamic_connectable_observable(SF&& sf, CF&& cf)
- : dynamic_observable<T>(std::forward<SF>(sf))
- , state(std::make_shared<state_type>())
- {
- state->on_connect = std::forward<CF>(cf);
- }
-
- using dynamic_observable<T>::on_subscribe;
-
- void on_connect(composite_subscription cs) const {
- state->on_connect(std::move(cs));
- }
-};
-
-template<class T, class Source>
-connectable_observable<T> make_dynamic_connectable_observable(Source&& s) {
- return connectable_observable<T>(dynamic_connectable_observable<T>(std::forward<Source>(s)));
-}
-
-
-/*!
- \brief a source of values that is shared across all subscribers and does not start until connectable_observable::connect() is called.
-
- \ingroup group-observable
-
-*/
-template<class T, class SourceOperator>
-class connectable_observable
- : public observable<T, SourceOperator>
-{
- typedef connectable_observable<T, SourceOperator> this_type;
- typedef observable<T, SourceOperator> base_type;
- typedef rxu::decay_t<SourceOperator> source_operator_type;
-
- static_assert(detail::has_on_connect<source_operator_type>::value, "inner must have on_connect method void(composite_subscription)");
-
-public:
- typedef tag_connectable_observable observable_tag;
-
- connectable_observable()
- {
- }
-
- explicit connectable_observable(const SourceOperator& o)
- : base_type(o)
- {
- }
- explicit connectable_observable(SourceOperator&& o)
- : base_type(std::move(o))
- {
- }
-
- // implicit conversion between observables of the same value_type
- template<class SO>
- connectable_observable(const connectable_observable<T, SO>& o)
- : base_type(o)
- {}
- // implicit conversion between observables of the same value_type
- template<class SO>
- connectable_observable(connectable_observable<T, SO>&& o)
- : base_type(std::move(o))
- {}
-
- ///
- /// takes any function that will take this observable and produce a result value.
- /// this is intended to allow externally defined operators, that use subscribe,
- /// to be connected into the expression.
- ///
- template<class OperatorFactory>
- auto op(OperatorFactory&& of) const
- -> decltype(of(*(const this_type*)nullptr)) {
- return of(*this);
- static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
- }
-
- ///
- /// performs type-forgetting conversion to a new composite_observable
- ///
- connectable_observable<T> as_dynamic() {
- return *this;
- }
-
- composite_subscription connect(composite_subscription cs = composite_subscription()) {
- base_type::source_operator.on_connect(cs);
- return cs;
- }
-
- /*! @copydoc rx-ref_count.hpp
- */
- template<class... AN>
- auto ref_count(AN... an) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
- /// \endcond
- {
- return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...);
- }
-
- /*! @copydoc rx-connect_forever.hpp
- */
- template<class... AN>
- auto connect_forever(AN... an) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(observable_member(connect_forever_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
- /// \endcond
- {
- return observable_member(connect_forever_tag{}, *this, std::forward<AN>(an)...);
- }
-};
-
-
-}
-
-//
-// support range() >> filter() >> subscribe() syntax
-// '>>' is spelled 'stream'
-//
-template<class T, class SourceOperator, class OperatorFactory>
-auto operator >> (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
- -> decltype(source.op(std::forward<OperatorFactory>(of))) {
- return source.op(std::forward<OperatorFactory>(of));
-}
-
-//
-// support range() | filter() | subscribe() syntax
-// '|' is spelled 'pipe'
-//
-template<class T, class SourceOperator, class OperatorFactory>
-auto operator | (const rxcpp::connectable_observable<T, SourceOperator>& source, OperatorFactory&& of)
- -> decltype(source.op(std::forward<OperatorFactory>(of))) {
- return source.op(std::forward<OperatorFactory>(of));
-}
-
-#endif