diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-zip.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-zip.hpp | 344 |
1 files changed, 0 insertions, 344 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-zip.hpp b/Rx/v2/src/rxcpp/operators/rx-zip.hpp deleted file mode 100644 index b8169fd..0000000 --- a/Rx/v2/src/rxcpp/operators/rx-zip.hpp +++ /dev/null @@ -1,344 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -#pragma once - -#if !defined(RXCPP_OPERATORS_RX_ZIP_HPP) -#define RXCPP_OPERATORS_RX_ZIP_HPP - -#include "../rx-includes.hpp" - -/*! \file rx-zip.hpp - - \brief Bring by one item from all given observables and select a value to emit from the new observable that is returned. - - \tparam AN types of scheduler (optional), aggregate function (optional), and source observables - - \param an scheduler (optional), aggregation function (optional), and source observables - - \return Observable that emits the result of combining the items emitted and brought by one from each of the source observables. - - If scheduler is omitted, identity_current_thread is used. - - If aggregation function is omitted, the resulting observable returns tuples of emitted items. - - \sample - - Neither scheduler nor aggregation function are present: - \snippet zip.cpp zip sample - \snippet output.txt zip sample - - Only scheduler is present: - \snippet zip.cpp Coordination zip sample - \snippet output.txt Coordination zip sample - - Only aggregation function is present: - \snippet zip.cpp Selector zip sample - \snippet output.txt Selector zip sample - - Both scheduler and aggregation function are present: - \snippet zip.cpp Coordination+Selector zip sample - \snippet output.txt Coordination+Selector zip sample -*/ - -namespace rxcpp { - -namespace operators { - -namespace detail { - -template<class Observable> -struct zip_source_state -{ - using value_type = rxu::value_type_t<Observable>; - zip_source_state() - : completed(false) - { - } - std::list<value_type> values; - bool completed; -}; - -struct values_not_empty { - template<class Observable> - bool operator()(zip_source_state<Observable>& source) const { - return !source.values.empty(); - } -}; - -struct source_completed_values_empty { - template<class Observable> - bool operator()(zip_source_state<Observable>& source) const { - return source.completed && source.values.empty(); - } -}; - -struct extract_value_front { - template<class Observable, class Value = rxu::value_type_t<Observable>> - Value operator()(zip_source_state<Observable>& source) const { - auto val = std::move(source.values.front()); - source.values.pop_front(); - return val; - } -}; - -template<class... AN> -struct zip_invalid_arguments {}; - -template<class... AN> -struct zip_invalid : public rxo::operator_base<zip_invalid_arguments<AN...>> { - using type = observable<zip_invalid_arguments<AN...>, zip_invalid<AN...>>; -}; -template<class... AN> -using zip_invalid_t = typename zip_invalid<AN...>::type; - -template<class Selector, class... ObservableN> -struct is_zip_selector_check { - typedef rxu::decay_t<Selector> selector_type; - - struct tag_not_valid; - template<class CS, class... CON> - static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...)); - template<class CS, class... CON> - static tag_not_valid check(...); - - using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0)); - - static const bool value = !std::is_same<type, tag_not_valid>::value; -}; - -template<class Selector, class... ObservableN> -struct invalid_zip_selector { - static const bool value = false; -}; - -template<class Selector, class... ObservableN> -struct is_zip_selector : public std::conditional< - is_zip_selector_check<Selector, ObservableN...>::value, - is_zip_selector_check<Selector, ObservableN...>, - invalid_zip_selector<Selector, ObservableN...>>::type { -}; - -template<class Selector, class... ON> -using result_zip_selector_t = typename is_zip_selector<Selector, ON...>::type; - -template<class Coordination, class Selector, class... ObservableN> -struct zip_traits { - typedef std::tuple<rxu::decay_t<ObservableN>...> tuple_source_type; - typedef std::tuple<zip_source_state<ObservableN>...> tuple_source_values_type; - - typedef rxu::decay_t<Selector> selector_type; - typedef rxu::decay_t<Coordination> coordination_type; - - typedef typename is_zip_selector<selector_type, ObservableN...>::type value_type; -}; - -template<class Coordination, class Selector, class... ObservableN> -struct zip : public operator_base<rxu::value_type_t<zip_traits<Coordination, Selector, ObservableN...>>> -{ - typedef zip<Coordination, Selector, ObservableN...> this_type; - - typedef zip_traits<Coordination, Selector, ObservableN...> traits; - - typedef typename traits::tuple_source_type tuple_source_type; - typedef typename traits::tuple_source_values_type tuple_source_values_type; - - typedef typename traits::selector_type selector_type; - - typedef typename traits::coordination_type coordination_type; - typedef typename coordination_type::coordinator_type coordinator_type; - - struct values - { - values(tuple_source_type o, selector_type s, coordination_type sf) - : source(std::move(o)) - , selector(std::move(s)) - , coordination(std::move(sf)) - { - } - tuple_source_type source; - selector_type selector; - coordination_type coordination; - }; - values initial; - - zip(coordination_type sf, selector_type s, tuple_source_type ts) - : initial(std::move(ts), std::move(s), std::move(sf)) - { - } - - template<int Index, class State> - void subscribe_one(std::shared_ptr<State> state) const { - - typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type; - - composite_subscription innercs; - - // when the out observer is unsubscribed all the - // inner subscriptions are unsubscribed as well - state->out.add(innercs); - - auto source = on_exception( - [&](){return state->coordinator.in(std::get<Index>(state->source));}, - state->out); - if (source.empty()) { - return; - } - - // this subscribe does not share the observer subscription - // so that when it is unsubscribed the observer can be called - // until the inner subscriptions have finished - auto sink = make_subscriber<source_value_type>( - state->out, - innercs, - // on_next - [state](source_value_type st) { - auto& values = std::get<Index>(state->pending).values; - values.push_back(st); - if (rxu::apply_to_each(state->pending, values_not_empty(), rxu::all_values_true())) { - auto selectedResult = rxu::apply_to_each(state->pending, extract_value_front(), state->selector); - state->out.on_next(selectedResult); - } - if (rxu::apply_to_each(state->pending, source_completed_values_empty(), rxu::any_value_true())) { - state->out.on_completed(); - } - }, - // on_error - [state](rxu::error_ptr e) { - state->out.on_error(e); - }, - // on_completed - [state]() { - auto& completed = std::get<Index>(state->pending).completed; - completed = true; - if (--state->pendingCompletions == 0) { - state->out.on_completed(); - } - } - ); - auto selectedSink = on_exception( - [&](){return state->coordinator.out(sink);}, - state->out); - if (selectedSink.empty()) { - return; - } - source->subscribe(std::move(selectedSink.get())); - } - template<class State, int... IndexN> - void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const { - bool subscribed[] = {(subscribe_one<IndexN>(state), true)...}; - subscribed[0] = (*subscribed); // silence warning - } - - template<class Subscriber> - void on_subscribe(Subscriber scbr) const { - static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); - - typedef Subscriber output_type; - - struct zip_state_type - : public std::enable_shared_from_this<zip_state_type> - , public values - { - zip_state_type(values i, coordinator_type coor, output_type oarg) - : values(std::move(i)) - , pendingCompletions(sizeof... (ObservableN)) - , valuesSet(0) - , coordinator(std::move(coor)) - , out(std::move(oarg)) - { - } - - // on_completed on the output must wait until all the - // subscriptions have received on_completed - mutable int pendingCompletions; - mutable int valuesSet; - mutable tuple_source_values_type pending; - coordinator_type coordinator; - output_type out; - }; - - auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); - - // take a copy of the values for each subscription - auto state = std::make_shared<zip_state_type>(initial, std::move(coordinator), std::move(scbr)); - - subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type()); - } -}; - -} - -/*! @copydoc rx-zip.hpp -*/ -template<class... AN> -auto zip(AN&&... an) - -> operator_factory<zip_tag, AN...> { - return operator_factory<zip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); -} - -} - -template<> -struct member_overload<zip_tag> -{ - template<class Observable, class... ObservableN, - class Enabled = rxu::enable_if_all_true_type_t< - all_observables<Observable, ObservableN...>>, - class Zip = rxo::detail::zip<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, - class Value = rxu::value_type_t<Zip>, - class Result = observable<Value, Zip>> - static Result member(Observable&& o, ObservableN&&... on) - { - return Result(Zip(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); - } - - template<class Observable, class Selector, class... ObservableN, - class Enabled = rxu::enable_if_all_true_type_t< - operators::detail::is_zip_selector<Selector, Observable, ObservableN...>, - all_observables<Observable, ObservableN...>>, - class ResolvedSelector = rxu::decay_t<Selector>, - class Zip = rxo::detail::zip<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, - class Value = rxu::value_type_t<Zip>, - class Result = observable<Value, Zip>> - static Result member(Observable&& o, Selector&& s, ObservableN&&... on) - { - return Result(Zip(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); - } - - template<class Coordination, class Observable, class... ObservableN, - class Enabled = rxu::enable_if_all_true_type_t< - is_coordination<Coordination>, - all_observables<Observable, ObservableN...>>, - class Zip = rxo::detail::zip<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, - class Value = rxu::value_type_t<Zip>, - class Result = observable<Value, Zip>> - static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on) - { - return Result(Zip(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); - } - - template<class Coordination, class Selector, class Observable, class... ObservableN, - class Enabled = rxu::enable_if_all_true_type_t< - is_coordination<Coordination>, - operators::detail::is_zip_selector<Selector, Observable, ObservableN...>, - all_observables<Observable, ObservableN...>>, - class ResolvedSelector = rxu::decay_t<Selector>, - class Zip = rxo::detail::zip<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, - class Value = rxu::value_type_t<Zip>, - class Result = observable<Value, Zip>> - static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on) - { - return Result(Zip(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); - } - - template<class... AN> - static operators::detail::zip_invalid_t<AN...> member(const AN&...) { - std::terminate(); - return {}; - static_assert(sizeof...(AN) == 10000, "zip takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)"); - } -}; - -} - -#endif |