diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/rx-observable.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 1811 |
1 files changed, 0 insertions, 1811 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp deleted file mode 100644 index 7e3d567..0000000 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ /dev/null @@ -1,1811 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -#pragma once - -#if !defined(RXCPP_RX_OBSERVABLE_HPP) -#define RXCPP_RX_OBSERVABLE_HPP - -#include "rx-includes.hpp" - -#ifdef __GNUG__ -#define EXPLICIT_THIS this-> -#else -#define EXPLICIT_THIS -#endif - -namespace rxcpp { - -namespace detail { - -template<class Subscriber, class T> -struct has_on_subscribe_for -{ - struct not_void {}; - template<class CS, class CT> - static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr)); - template<class CS, class CT> - static not_void check(...); - - typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result; - static const bool value = std::is_same<detail_result, void>::value; -}; - -} - -template<class T> -class dynamic_observable - : public rxs::source_base<T> -{ - struct state_type - : public std::enable_shared_from_this<state_type> - { - typedef std::function<void(subscriber<T>)> onsubscribe_type; - - onsubscribe_type on_subscribe; - }; - std::shared_ptr<state_type> state; - - template<class U> - friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&); - - template<class SO> - void construct(SO&& source, rxs::tag_source&&) { - rxu::decay_t<SO> so = std::forward<SO>(source); - state->on_subscribe = [so](subscriber<T> o) mutable { - so.on_subscribe(std::move(o)); - }; - } - - struct tag_function {}; - template<class F> - void construct(F&& f, tag_function&&) { - state->on_subscribe = std::forward<F>(f); - } - -public: - - typedef tag_dynamic_observable dynamic_observable_tag; - - dynamic_observable() - { - } - - template<class SOF> - explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0) - : state(std::make_shared<state_type>()) - { - construct(std::forward<SOF>(sof), - typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type()); - } - - void on_subscribe(subscriber<T> o) const { - state->on_subscribe(std::move(o)); - } - - template<class Subscriber> - typename std::enable_if<is_subscriber<Subscriber>::value, void>::type - on_subscribe(Subscriber o) const { - state->on_subscribe(o.as_dynamic()); - } -}; - -template<class T> -inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) { - return lhs.state == rhs.state; -} -template<class T> -inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) { - return !(lhs == rhs); -} - -template<class T, class Source> -observable<T> make_observable_dynamic(Source&& s) { - return observable<T>(dynamic_observable<T>(std::forward<Source>(s))); -} - -namespace detail { -template<bool Selector, class Default, class SO> -struct resolve_observable; - -template<class Default, class SO> -struct resolve_observable<true, Default, SO> -{ - typedef typename SO::type type; - typedef typename type::value_type value_type; - static const bool value = true; - typedef observable<value_type, type> observable_type; - template<class... AN> - static observable_type make(const Default&, AN&&... an) { - return observable_type(type(std::forward<AN>(an)...)); - } -}; -template<class Default, class SO> -struct resolve_observable<false, Default, SO> -{ - static const bool value = false; - typedef Default observable_type; - template<class... AN> - static observable_type make(const observable_type& that, const AN&...) { - return that; - } -}; -template<class SO> -struct resolve_observable<true, void, SO> -{ - typedef typename SO::type type; - typedef typename type::value_type value_type; - static const bool value = true; - typedef observable<value_type, type> observable_type; - template<class... AN> - static observable_type make(AN&&... an) { - return observable_type(type(std::forward<AN>(an)...)); - } -}; -template<class SO> -struct resolve_observable<false, void, SO> -{ - static const bool value = false; - typedef void observable_type; - template<class... AN> - static observable_type make(const AN&...) { - } -}; - -} - -template<class Selector, class Default, template<class... TN> class SO, class... AN> -struct defer_observable - : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>> -{ -}; - -/*! - \brief a source of values whose methods block until all values have been emitted. subscribe or use one of the operator methods that reduce the values emitted to a single value. - - \ingroup group-observable - -*/ -template<class T, class Observable> -class blocking_observable -{ - template<class Obsvbl, class... ArgN> - static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an) - -> void { - std::mutex lock; - std::condition_variable wake; - bool disposed = false; - - auto dest = make_subscriber<T>(std::forward<ArgN>(an)...); - - rxu::error_ptr error; - bool has_error = false; - - // keep any error to rethrow at the end. - // copy 'dest' by-value to avoid using it after it goes out of scope. - auto scbr = make_subscriber<T>( - dest, - [dest](T t){dest.on_next(t);}, - [dest,&error,&has_error,do_rethrow](rxu::error_ptr e){ - if (do_rethrow) { - has_error = true; - error = e; - } else { - dest.on_error(e); - } - }, - [dest](){dest.on_completed();} - ); - - auto cs = scbr.get_subscription(); - cs.add( - [&](){ - std::unique_lock<std::mutex> guard(lock); - wake.notify_one(); - disposed = true; - }); - - source.subscribe(std::move(scbr)); - - std::unique_lock<std::mutex> guard(lock); - wake.wait(guard, - [&](){ - return disposed; - }); - - if (has_error) {rxu::rethrow_exception(error);} - } - -public: - typedef rxu::decay_t<Observable> observable_type; - observable_type source; - ~blocking_observable() - { - } - blocking_observable(observable_type s) : source(std::move(s)) {} - - /// - /// `subscribe` will cause this observable to emit values to the provided subscriber. - /// - /// \return void - /// - /// \param an... - the arguments are passed to make_subscriber(). - /// - /// callers must provide enough arguments to make a subscriber. - /// overrides are supported. thus - /// `subscribe(thesubscriber, composite_subscription())` - /// will take `thesubscriber.get_observer()` and the provided - /// subscription and subscribe to the new subscriber. - /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer - /// if a subscription or subscriber is not provided then a new subscription will be created. - /// - template<class... ArgN> - auto subscribe(ArgN&&... an) const - -> void { - return blocking_subscribe(source, false, std::forward<ArgN>(an)...); - } - - /// - /// `subscribe_with_rethrow` will cause this observable to emit values to the provided subscriber. - /// - /// \note If the source observable calls on_error, the raised exception is rethrown by this method. - /// - /// \note If the source observable calls on_error, the `on_error` method on the subscriber will not be called. - /// - /// \return void - /// - /// \param an... - the arguments are passed to make_subscriber(). - /// - /// callers must provide enough arguments to make a subscriber. - /// overrides are supported. thus - /// `subscribe(thesubscriber, composite_subscription())` - /// will take `thesubscriber.get_observer()` and the provided - /// subscription and subscribe to the new subscriber. - /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer - /// if a subscription or subscriber is not provided then a new subscription will be created. - /// - template<class... ArgN> - auto subscribe_with_rethrow(ArgN&&... an) const - -> void { - return blocking_subscribe(source, true, std::forward<ArgN>(an)...); - } - - /*! Return the first item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. - - \return The first item emitted by this blocking_observable. - - \note If the source observable calls on_error, the raised exception is rethrown by this method. - - \sample - When the source observable emits at least one item: - \snippet blocking_observable.cpp blocking first sample - \snippet output.txt blocking first sample - - When the source observable is empty: - \snippet blocking_observable.cpp blocking first empty sample - \snippet output.txt blocking first empty sample - */ - template<class... AN> - auto first(AN**...) -> delayed_type_t<T, AN...> const { - rxu::maybe<T> result; - composite_subscription cs; - subscribe_with_rethrow( - cs, - [&](T v){result.reset(v); cs.unsubscribe();}); - if (result.empty()) - rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value")); - return result.get(); - static_assert(sizeof...(AN) == 0, "first() was passed too many arguments."); - } - - /*! Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. - - \return The last item emitted by this blocking_observable. - - \note If the source observable calls on_error, the raised exception is rethrown by this method. - - \sample - When the source observable emits at least one item: - \snippet blocking_observable.cpp blocking last sample - \snippet output.txt blocking last sample - - When the source observable is empty: - \snippet blocking_observable.cpp blocking last empty sample - \snippet output.txt blocking last empty sample - */ - template<class... AN> - auto last(AN**...) -> delayed_type_t<T, AN...> const { - rxu::maybe<T> result; - subscribe_with_rethrow( - [&](T v){result.reset(v);}); - if (result.empty()) - rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value")); - return result.get(); - static_assert(sizeof...(AN) == 0, "last() was passed too many arguments."); - } - - /*! Return the total number of items emitted by this blocking_observable. - - \return The total number of items emitted by this blocking_observable. - - \sample - \snippet blocking_observable.cpp blocking count sample - \snippet output.txt blocking count sample - - When the source observable calls on_error: - \snippet blocking_observable.cpp blocking count error sample - \snippet output.txt blocking count error sample - */ - int count() const { - int result = 0; - source.count().as_blocking().subscribe_with_rethrow( - [&](int v){result = v;}); - return result; - } - - /*! Return the sum of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. - - \return The sum of all items emitted by this blocking_observable. - - \sample - When the source observable emits at least one item: - \snippet blocking_observable.cpp blocking sum sample - \snippet output.txt blocking sum sample - - When the source observable is empty: - \snippet blocking_observable.cpp blocking sum empty sample - \snippet output.txt blocking sum empty sample - - When the source observable calls on_error: - \snippet blocking_observable.cpp blocking sum error sample - \snippet output.txt blocking sum error sample - */ - T sum() const { - return source.sum().as_blocking().last(); - } - - /*! Return the average value of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. - - \return The average value of all items emitted by this blocking_observable. - - \sample - When the source observable emits at least one item: - \snippet blocking_observable.cpp blocking average sample - \snippet output.txt blocking average sample - - When the source observable is empty: - \snippet blocking_observable.cpp blocking average empty sample - \snippet output.txt blocking average empty sample - - When the source observable calls on_error: - \snippet blocking_observable.cpp blocking average error sample - \snippet output.txt blocking average error sample - */ - double average() const { - return source.average().as_blocking().last(); - } - - /*! Return the max of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. - - \return The max of all items emitted by this blocking_observable. - - \sample - When the source observable emits at least one item: - \snippet blocking_observable.cpp blocking max sample - \snippet output.txt blocking max sample - - When the source observable is empty: - \snippet blocking_observable.cpp blocking max empty sample - \snippet output.txt blocking max empty sample - - When the source observable calls on_error: - \snippet blocking_observable.cpp blocking max error sample - \snippet output.txt blocking max error sample -*/ - T max() const { - return source.max().as_blocking().last(); - } - - /*! Return the min of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. - - \return The min of all items emitted by this blocking_observable. - - \sample - When the source observable emits at least one item: - \snippet blocking_observable.cpp blocking min sample - \snippet output.txt blocking min sample - - When the source observable is empty: - \snippet blocking_observable.cpp blocking min empty sample - \snippet output.txt blocking min empty sample - - When the source observable calls on_error: - \snippet blocking_observable.cpp blocking min error sample - \snippet output.txt blocking min error sample -*/ - T min() const { - return source.min().as_blocking().last(); - } -}; - -namespace detail { - -template<class SourceOperator, class Subscriber> -struct safe_subscriber -{ - safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {} - - void subscribe() { - RXCPP_TRY { - so->on_subscribe(*o); - } RXCPP_CATCH(...) { - if (!o->is_subscribed()) { - rxu::rethrow_current_exception(); - } - o->on_error(rxu::make_error_ptr(rxu::current_exception())); - o->unsubscribe(); - } - } - - void operator()(const rxsc::schedulable&) { - subscribe(); - } - - SourceOperator* so; - Subscriber* o; -}; - -} - -template<> -class observable<void, void>; - -/*! - \defgroup group-observable Observables - - \brief These are the set of observable classes in rxcpp. - - \class rxcpp::observable - - \ingroup group-observable group-core - - \brief a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source. - - \par Some code - This sample will observable::subscribe() to values from a observable<void, void>::range(). - - \sample - \snippet range.cpp range sample - \snippet output.txt range sample - -*/ -template<class T, class SourceOperator> -class observable - : public observable_base<T> -{ - static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>"); - - typedef observable<T, SourceOperator> this_type; - -public: - typedef rxu::decay_t<SourceOperator> source_operator_type; - mutable source_operator_type source_operator; - -private: - - template<class U, class SO> - friend class observable; - - template<class U, class SO> - friend bool operator==(const observable<U, SO>&, const observable<U, SO>&); - - template<class Subscriber> - auto detail_subscribe(Subscriber o) const - -> composite_subscription { - - typedef rxu::decay_t<Subscriber> subscriber_type; - - static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber"); - static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible"); - static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber "); - - trace_activity().subscribe_enter(*this, o); - - if (!o.is_subscribed()) { - trace_activity().subscribe_return(*this); - return o.get_subscription(); - } - - detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o); - - // make sure to let current_thread take ownership of the thread as early as possible. - if (rxsc::current_thread::is_schedule_required()) { - const auto& sc = rxsc::make_current_thread(); - sc.create_worker(o.get_subscription()).schedule(subscriber); - } else { - // current_thread already owns this thread. - subscriber.subscribe(); - } - - trace_activity().subscribe_return(*this); - return o.get_subscription(); - } - -public: - typedef T value_type; - - static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source"); - - ~observable() - { - } - - observable() - { - } - - explicit observable(const source_operator_type& o) - : source_operator(o) - { - } - explicit observable(source_operator_type&& o) - : source_operator(std::move(o)) - { - } - - /// implicit conversion between observables of the same value_type - template<class SO> - observable(const observable<T, SO>& o) - : source_operator(o.source_operator) - {} - /// implicit conversion between observables of the same value_type - template<class SO> - observable(observable<T, SO>&& o) - : source_operator(std::move(o.source_operator)) - {} - -#if 0 - template<class I> - void on_subscribe(observer<T, I> o) const { - source_operator.on_subscribe(o); - } -#endif - - /*! @copydoc rxcpp::operators::as_dynamic - */ - template<class... AN> - observable<T> as_dynamic(AN**...) const { - return *this; - static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments."); - } - - /*! @copydoc rx-ref_count.hpp - */ - template<class... AN> - auto ref_count(AN... an) const // ref_count(ConnectableObservable&&) - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rxcpp::operators::as_blocking - */ - template<class... AN> - blocking_observable<T, this_type> as_blocking(AN**...) const { - return blocking_observable<T, this_type>(*this); - static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments."); - } - - /// \cond SHOW_SERVICE_MEMBERS - - /// - /// takes any function that will take this observable and produce a result value. - /// this is intended to allow externally defined operators, that use subscribe, - /// to be connected into the expression. - /// - template<class OperatorFactory> - auto op(OperatorFactory&& of) const - -> decltype(of(*(const this_type*)nullptr)) { - return of(*this); - static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)"); - } - - /*! @copydoc rx-lift.hpp - */ - template<class ResultType, class Operator> - auto lift(Operator&& op) const - -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> { - return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>( - rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op))); - static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)"); - } - - /// - /// takes any function that will take a subscriber for this observable and produce a subscriber. - /// this is intended to allow externally defined operators, that use make_subscriber, to be connected - /// into the expression. - /// - template<class ResultType, class Operator> - auto lift_if(Operator&& op) const - -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, - observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type { - return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>( - rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op))); - } - /// - /// takes any function that will take a subscriber for this observable and produce a subscriber. - /// this is intended to allow externally defined operators, that use make_subscriber, to be connected - /// into the expression. - /// - template<class ResultType, class Operator> - auto lift_if(Operator&&) const - -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, - decltype(rxs::from<ResultType>())>::type { - return rxs::from<ResultType>(); - } - /// \endcond - - /*! @copydoc rx-subscribe.hpp - */ - template<class... ArgN> - auto subscribe(ArgN&&... an) const - -> composite_subscription { - return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...)); - } - - /*! @copydoc rx-all.hpp - */ - template<class... AN> - auto all(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(all_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rxcpp::operators::is_empty - */ - template<class... AN> - auto is_empty(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(is_empty_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-any.hpp - */ - template<class... AN> - auto any(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(any_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rxcpp::operators::exists - */ - template<class... AN> - auto exists(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(exists_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rxcpp::operators::contains - */ - template<class... AN> - auto contains(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(contains_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-filter.hpp - */ - template<class... AN> - auto filter(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(filter_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-switch_if_empty.hpp - */ - template<class... AN> - auto switch_if_empty(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(switch_if_empty_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rxcpp::operators::default_if_empty - */ - template<class... AN> - auto default_if_empty(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(default_if_empty_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-sequence_equal.hpp - */ - template<class... AN> - auto sequence_equal(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(sequence_equal_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-tap.hpp - */ - template<class... AN> - auto tap(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(tap_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-time_interval.hpp - */ - template<class... AN> - auto time_interval(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-timeout.hpp - */ - template<class... AN> - auto timeout(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(timeout_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-timestamp.hpp - */ - template<class... AN> - auto timestamp(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-finally.hpp - */ - template<class... AN> - auto finally(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(finally_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-on_error_resume_next.hpp - */ - template<class... AN> - auto on_error_resume_next(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-on_error_resume_next.hpp - */ - template<class... AN> - auto switch_on_error(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-map.hpp - */ - template<class... AN> - auto map(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(map_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-map.hpp - */ - template<class... AN> - auto transform(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(map_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-debounce.hpp - */ - template<class... AN> - auto debounce(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(debounce_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-delay.hpp - */ - template<class... AN> - auto delay(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(delay_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-distinct.hpp - */ - template<class... AN> - auto distinct(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(distinct_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-distinct_until_changed.hpp - */ - template<class... AN> - auto distinct_until_changed(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(distinct_until_changed_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-element_at.hpp - */ - template<class... AN> - auto element_at(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-window.hpp - */ - template<class... AN> - auto window(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(window_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-window_time.hpp - */ - template<class... AN> - auto window_with_time(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-window_time_count.hpp - */ - template<class... AN> - auto window_with_time_or_count(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(window_with_time_or_count_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-window_toggle.hpp - */ - template<class... AN> - auto window_toggle(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(window_toggle_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-buffer_count.hpp - */ - template<class... AN> - auto buffer(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(buffer_count_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-buffer_time.hpp - */ - template<class... AN> - auto buffer_with_time(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-buffer_time_count.hpp - */ - template<class... AN> - auto buffer_with_time_or_count(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-switch_on_next.hpp - */ - template<class... AN> - auto switch_on_next(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-merge.hpp - */ - template<class... AN> - auto merge(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(merge_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-merge_delay_error.hpp - */ - template<class... AN> - auto merge_delay_error(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-amb.hpp - */ - template<class... AN> - auto amb(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(amb_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-flat_map.hpp - */ - template<class... AN> - auto flat_map(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-flat_map.hpp - */ - template<class... AN> - auto merge_transform(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-concat.hpp - */ - template<class... AN> - auto concat(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(concat_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-concat_map.hpp - */ - template<class... AN> - auto concat_map(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-concat_map.hpp - */ - template<class... AN> - auto concat_transform(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-with_latest_from.hpp - */ - template<class... AN> - auto with_latest_from(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(with_latest_from_tag{}, *this, std::forward<AN>(an)...); - } - - - /*! @copydoc rx-combine_latest.hpp - */ - template<class... AN> - auto combine_latest(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(combine_latest_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-zip.hpp - */ - template<class... AN> - auto zip(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(zip_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-group_by.hpp - */ - template<class... AN> - inline auto group_by(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(group_by_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-ignore_elements.hpp - */ - template<class... AN> - auto ignore_elements(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-muticast.hpp - */ - template<class... AN> - auto multicast(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-publish.hpp - */ - template<class... AN> - auto publish(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(publish_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rxcpp::operators::publish_synchronized - */ - template<class... AN> - auto publish_synchronized(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-replay.hpp - */ - template<class... AN> - auto replay(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(replay_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-subscribe_on.hpp - */ - template<class... AN> - auto subscribe_on(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-observe_on.hpp - */ - template<class... AN> - auto observe_on(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-reduce.hpp - */ - template<class... AN> - auto reduce(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-reduce.hpp - */ - template<class... AN> - auto accumulate(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rxcpp::operators::first - */ - template<class... AN> - auto first(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(delayed_type<first_tag, AN...>::value(), *(this_type*)nullptr)) - /// \endcond - { - return observable_member(delayed_type<first_tag, AN...>::value(), *this); - static_assert(sizeof...(AN) == 0, "first() was passed too many arguments."); - } - - /*! @copydoc rxcpp::operators::last - */ - template<class... AN> - auto last(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(delayed_type<last_tag, AN...>::value(), *(this_type*)nullptr)) - /// \endcond - { - return observable_member(delayed_type<last_tag, AN...>::value(), *this); - static_assert(sizeof...(AN) == 0, "last() was passed too many arguments."); - } - - /*! @copydoc rxcpp::operators::count - */ - template<class... AN> - auto count(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(delayed_type<reduce_tag, AN...>::value(), *(this_type*)nullptr, 0, rxu::count(), identity_for<int>())) - /// \endcond - { - return observable_member(delayed_type<reduce_tag, AN...>::value(), *this, 0, rxu::count(), identity_for<int>()); - static_assert(sizeof...(AN) == 0, "count() was passed too many arguments."); - } - - /*! @copydoc rxcpp::operators::sum - */ - template<class... AN> - auto sum(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(delayed_type<sum_tag, AN...>::value(), *(this_type*)nullptr)) - /// \endcond - { - return observable_member(delayed_type<sum_tag, AN...>::value(), *this); - static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments."); - } - - /*! @copydoc rxcpp::operators::average - */ - template<class... AN> - auto average(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(delayed_type<average_tag, AN...>::value(), *(this_type*)nullptr)) - /// \endcond - { - return observable_member(delayed_type<average_tag, AN...>::value(), *this); - static_assert(sizeof...(AN) == 0, "average() was passed too many arguments."); - } - - /*! @copydoc rxcpp::operators::max - */ - template<class... AN> - auto max(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(delayed_type<max_tag, AN...>::value(), *(this_type*)nullptr)) - /// \endcond - { - return observable_member(delayed_type<max_tag, AN...>::value(), *this); - static_assert(sizeof...(AN) == 0, "max() was passed too many arguments."); - } - - /*! @copydoc rxcpp::operators::min - */ - template<class... AN> - auto min(AN**...) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(delayed_type<min_tag, AN...>::value(), *(this_type*)nullptr)) - /// \endcond - { - return observable_member(delayed_type<min_tag, AN...>::value(), *this); - static_assert(sizeof...(AN) == 0, "min() was passed too many arguments."); - } - - /*! @copydoc rx-scan.hpp - */ - template<class... AN> - auto scan(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(scan_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-sample_time.hpp - */ - template<class... AN> - auto sample_with_time(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(sample_with_time_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-skip.hpp - */ - template<class... AN> - auto skip(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(skip_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-skip.hpp - */ - template<class... AN> - auto skip_while(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(skip_while_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-skip_last.hpp - */ - template<class... AN> - auto skip_last(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-skip_until.hpp - */ - template<class... AN> - auto skip_until(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-take.hpp - */ - template<class... AN> - auto take(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(take_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-take_last.hpp - */ - template<class... AN> - auto take_last(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(take_last_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-take_until.hpp - */ - template<class... AN> - auto take_until(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(take_until_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-take_while.hpp - */ - template<class... AN> - auto take_while(AN&&... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-repeat.hpp - */ - template<class... AN> - auto repeat(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(repeat_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-retry.hpp - */ - template<class... AN> - auto retry(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(retry_tag{}, *(this_type*)this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-start_with.hpp - */ - template<class... AN> - auto start_with(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...); - } - - /*! @copydoc rx-pairwise.hpp - */ - template<class... AN> - auto pairwise(AN... an) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) - /// \endcond - { - return observable_member(pairwise_tag{}, *this, std::forward<AN>(an)...); - } -}; - -template<class T, class SourceOperator> -inline bool operator==(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) { - return lhs.source_operator == rhs.source_operator; -} -template<class T, class SourceOperator> -inline bool operator!=(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) { - return !(lhs == rhs); -} - -/*! - \defgroup group-core Basics - - \brief These are the core classes that combine to represent a set of values emitted over time that can be cancelled. - - \class rxcpp::observable<void, void> - - \brief typed as ```rxcpp::observable<>```, this is a collection of factory methods that return an observable. - - \ingroup group-core - - \par Create a new type of observable - - \sample - \snippet create.cpp Create sample - \snippet output.txt Create sample - - \par Create an observable that emits a range of values - - \sample - \snippet range.cpp range sample - \snippet output.txt range sample - - \par Create an observable that emits nothing / generates an error / immediately completes - - \sample - \snippet never.cpp never sample - \snippet output.txt never sample - \snippet error.cpp error sample - \snippet output.txt error sample - \snippet empty.cpp empty sample - \snippet output.txt empty sample - - \par Create an observable that generates new observable for each subscriber - - \sample - \snippet defer.cpp defer sample - \snippet output.txt defer sample - - \par Create an observable that emits items every specified interval of time - - \sample - \snippet interval.cpp interval sample - \snippet output.txt interval sample - - \par Create an observable that emits items in the specified interval of time - - \sample - \snippet timer.cpp duration timer sample - \snippet output.txt duration timer sample - - \par Create an observable that emits all items from a collection - - \sample - \snippet iterate.cpp iterate sample - \snippet output.txt iterate sample - - \par Create an observable that emits a set of specified items - - \sample - \snippet from.cpp from sample - \snippet output.txt from sample - - \par Create an observable that emits a single item - - \sample - \snippet just.cpp just sample - \snippet output.txt just sample - - \par Create an observable that emits a set of items and then subscribes to another observable - - \sample - \snippet start_with.cpp full start_with sample - \snippet output.txt full start_with sample - - \par Create an observable that generates a new observable based on a generated resource for each subscriber - - \sample - \snippet scope.cpp scope sample - \snippet output.txt scope sample - -*/ -template<> -class observable<void, void> -{ - ~observable(); -public: - /*! @copydoc rx-create.hpp - */ - template<class T, class OnSubscribe> - static auto create(OnSubscribe os) - -> decltype(rxs::create<T>(std::move(os))) { - return rxs::create<T>(std::move(os)); - } - - /*! @copydoc rx-range.hpp - */ - template<class T> - static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1) - -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) { - return rxs::range<T>(first, last, step, identity_current_thread()); - } - /*! @copydoc rx-range.hpp - */ - template<class T, class Coordination> - static auto range(T first, T last, std::ptrdiff_t step, Coordination cn) - -> decltype(rxs::range<T>(first, last, step, std::move(cn))) { - return rxs::range<T>(first, last, step, std::move(cn)); - } - /*! @copydoc rx-range.hpp - */ - template<class T, class Coordination> - static auto range(T first, T last, Coordination cn) - -> decltype(rxs::range<T>(first, last, std::move(cn))) { - return rxs::range<T>(first, last, std::move(cn)); - } - /*! @copydoc rx-range.hpp - */ - template<class T, class Coordination> - static auto range(T first, Coordination cn) - -> decltype(rxs::range<T>(first, std::move(cn))) { - return rxs::range<T>(first, std::move(cn)); - } - - /*! @copydoc rx-never.hpp - */ - template<class T> - static auto never() - -> decltype(rxs::never<T>()) { - return rxs::never<T>(); - } - - /*! @copydoc rx-defer.hpp - */ - template<class ObservableFactory> - static auto defer(ObservableFactory of) - -> decltype(rxs::defer(std::move(of))) { - return rxs::defer(std::move(of)); - } - - /*! @copydoc rx-interval.hpp - */ - template<class... AN> - static auto interval(rxsc::scheduler::clock_type::duration period, AN**...) - -> decltype(rxs::interval(period)) { - return rxs::interval(period); - static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments."); - } - /*! @copydoc rx-interval.hpp - */ - template<class Coordination> - static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn) - -> decltype(rxs::interval(period, std::move(cn))) { - return rxs::interval(period, std::move(cn)); - } - /*! @copydoc rx-interval.hpp - */ - template<class... AN> - static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...) - -> decltype(rxs::interval(initial, period)) { - return rxs::interval(initial, period); - static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments."); - } - /*! @copydoc rx-interval.hpp - */ - template<class Coordination> - static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn) - -> decltype(rxs::interval(initial, period, std::move(cn))) { - return rxs::interval(initial, period, std::move(cn)); - } - - /*! @copydoc rx-timer.hpp - */ - template<class... AN> - static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...) - -> decltype(rxs::timer(at)) { - return rxs::timer(at); - static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments."); - } - /*! @copydoc rx-timer.hpp - */ - template<class... AN> - static auto timer(rxsc::scheduler::clock_type::duration after, AN**...) - -> decltype(rxs::timer(after)) { - return rxs::timer(after); - static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments."); - } - /*! @copydoc rx-timer.hpp - */ - template<class Coordination> - static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn) - -> decltype(rxs::timer(when, std::move(cn))) { - return rxs::timer(when, std::move(cn)); - } - /*! @copydoc rx-timer.hpp - */ - template<class Coordination> - static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn) - -> decltype(rxs::timer(when, std::move(cn))) { - return rxs::timer(when, std::move(cn)); - } - - /*! @copydoc rx-iterate.hpp - */ - template<class Collection> - static auto iterate(Collection c) - -> decltype(rxs::iterate(std::move(c), identity_current_thread())) { - return rxs::iterate(std::move(c), identity_current_thread()); - } - /*! @copydoc rx-iterate.hpp - */ - template<class Collection, class Coordination> - static auto iterate(Collection c, Coordination cn) - -> decltype(rxs::iterate(std::move(c), std::move(cn))) { - return rxs::iterate(std::move(c), std::move(cn)); - } - - /*! @copydoc rxcpp::sources::from() - */ - template<class T> - static auto from() - -> decltype( rxs::from<T>()) { - return rxs::from<T>(); - } - /*! @copydoc rxcpp::sources::from(Coordination cn) - */ - template<class T, class Coordination> - static auto from(Coordination cn) - -> typename std::enable_if<is_coordination<Coordination>::value, - decltype( rxs::from<T>(std::move(cn)))>::type { - return rxs::from<T>(std::move(cn)); - } - /*! @copydoc rxcpp::sources::from(Value0 v0, ValueN... vn) - */ - template<class Value0, class... ValueN> - static auto from(Value0 v0, ValueN... vn) - -> typename std::enable_if<!is_coordination<Value0>::value, - decltype( rxs::from(v0, vn...))>::type { - return rxs::from(v0, vn...); - } - /*! @copydoc rxcpp::sources::from(Coordination cn, Value0 v0, ValueN... vn) - */ - template<class Coordination, class Value0, class... ValueN> - static auto from(Coordination cn, Value0 v0, ValueN... vn) - -> typename std::enable_if<is_coordination<Coordination>::value, - decltype( rxs::from(std::move(cn), v0, vn...))>::type { - return rxs::from(std::move(cn), v0, vn...); - } - - /*! @copydoc rxcpp::sources::just(Value0 v0) - */ - template<class T> - static auto just(T v) - -> decltype(rxs::just(std::move(v))) { - return rxs::just(std::move(v)); - } - /*! @copydoc rxcpp::sources::just(Value0 v0, Coordination cn) - */ - template<class T, class Coordination> - static auto just(T v, Coordination cn) - -> decltype(rxs::just(std::move(v), std::move(cn))) { - return rxs::just(std::move(v), std::move(cn)); - } - - /*! @copydoc rxcpp::sources::start_with(Observable o, Value0 v0, ValueN... vn) - */ - template<class Observable, class Value0, class... ValueN> - static auto start_with(Observable o, Value0 v0, ValueN... vn) - -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) { - return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...); - } - - /*! @copydoc rx-empty.hpp - */ - template<class T> - static auto empty() - -> decltype(from<T>()) { - return from<T>(); - } - /*! @copydoc rx-empty.hpp - */ - template<class T, class Coordination> - static auto empty(Coordination cn) - -> decltype(from<T>(std::move(cn))) { - return from<T>(std::move(cn)); - } - - /*! @copydoc rx-error.hpp - */ - template<class T, class Exception> - static auto error(Exception&& e) - -> decltype(rxs::error<T>(std::forward<Exception>(e))) { - return rxs::error<T>(std::forward<Exception>(e)); - } - /*! @copydoc rx-error.hpp - */ - template<class T, class Exception, class Coordination> - static auto error(Exception&& e, Coordination cn) - -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) { - return rxs::error<T>(std::forward<Exception>(e), std::move(cn)); - } - - /*! @copydoc rx-scope.hpp - */ - template<class ResourceFactory, class ObservableFactory> - static auto scope(ResourceFactory rf, ObservableFactory of) - -> decltype(rxs::scope(std::move(rf), std::move(of))) { - return rxs::scope(std::move(rf), std::move(of)); - } -}; - -} - -// -// support range() >> filter() >> subscribe() syntax -// '>>' is spelled 'stream' -// -template<class T, class SourceOperator, class OperatorFactory> -auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of) - -> decltype(source.op(std::forward<OperatorFactory>(of))) { - return source.op(std::forward<OperatorFactory>(of)); -} - -// -// support range() | filter() | subscribe() syntax -// '|' is spelled 'pipe' -// -template<class T, class SourceOperator, class OperatorFactory> -auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of) - -> decltype(source.op(std::forward<OperatorFactory>(of))) { - return source.op(std::forward<OperatorFactory>(of)); -} - -#endif |