diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-reduce.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-reduce.hpp | 687 |
1 files changed, 0 insertions, 687 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp deleted file mode 100644 index e5dc819..0000000 --- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp +++ /dev/null @@ -1,687 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -#pragma once - -/*! \file rx-reduce.hpp - - \brief For each item from this observable use Accumulator to combine items, when completed use ResultSelector to produce a value that will be emitted from the new observable that is returned. - - \tparam Seed the type of the initial value for the accumulator - \tparam Accumulator the type of the data accumulating function - \tparam ResultSelector the type of the result producing function - - \param seed the initial value for the accumulator - \param a an accumulator function to be invoked on each item emitted by the source observable, the result of which will be used in the next accumulator call - \param rs a result producing function that makes the final value from the last accumulator call result - - \return An observable that emits a single item that is the result of accumulating the output from the items emitted by the source observable. - - Some basic reduce-type operators have already been implemented: - - rxcpp::operators::first - - rxcpp::operators::last - - rxcpp::operators::count - - rxcpp::operators::sum - - rxcpp::operators::average - - rxcpp::operators::min - - rxcpp::operators::max - - \sample - Geometric mean of source values: - \snippet reduce.cpp reduce sample - \snippet output.txt reduce sample - - If the source observable completes without emitting any items, the resulting observable emits the result of passing the initial seed to the result selector: - \snippet reduce.cpp reduce empty sample - \snippet output.txt reduce empty sample - - If the accumulator raises an exception, it is returned by the resulting observable in on_error: - \snippet reduce.cpp reduce exception from accumulator sample - \snippet output.txt reduce exception from accumulator sample - - The same for exceptions raised by the result selector: - \snippet reduce.cpp reduce exception from result selector sample - \snippet output.txt reduce exception from result selector sample -*/ - -#if !defined(RXCPP_OPERATORS_RX_REDUCE_HPP) -#define RXCPP_OPERATORS_RX_REDUCE_HPP - -#include "../rx-includes.hpp" - -namespace rxcpp { - -namespace operators { - -namespace detail { - -template<class... AN> -struct reduce_invalid_arguments {}; - -template<class... AN> -struct reduce_invalid : public rxo::operator_base<reduce_invalid_arguments<AN...>> { - using type = observable<reduce_invalid_arguments<AN...>, reduce_invalid<AN...>>; -}; -template<class... AN> -using reduce_invalid_t = typename reduce_invalid<AN...>::type; - -template<class Seed, class ResultSelector> -struct is_result_function_for { - - typedef rxu::decay_t<ResultSelector> result_selector_type; - typedef rxu::decay_t<Seed> seed_type; - - struct tag_not_valid {}; - - template<class CS, class CRS> - static auto check(int) -> decltype((*(CRS*)nullptr)(*(CS*)nullptr)); - template<class CS, class CRS> - static tag_not_valid check(...); - - typedef rxu::decay_t<decltype(check<seed_type, result_selector_type>(0))> type; - static const bool value = !std::is_same<type, tag_not_valid>::value; -}; - -template<class T, class Observable, class Accumulator, class ResultSelector, class Seed> -struct reduce_traits -{ - typedef rxu::decay_t<Observable> source_type; - typedef rxu::decay_t<Accumulator> accumulator_type; - typedef rxu::decay_t<ResultSelector> result_selector_type; - typedef rxu::decay_t<Seed> seed_type; - - typedef T source_value_type; - - typedef typename is_result_function_for<seed_type, result_selector_type>::type value_type; -}; - -template<class T, class Observable, class Accumulator, class ResultSelector, class Seed> -struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, Observable, Accumulator, ResultSelector, Seed>>> -{ - typedef reduce<T, Observable, Accumulator, ResultSelector, Seed> this_type; - typedef reduce_traits<T, Observable, Accumulator, ResultSelector, Seed> traits; - - typedef typename traits::source_type source_type; - typedef typename traits::accumulator_type accumulator_type; - typedef typename traits::result_selector_type result_selector_type; - typedef typename traits::seed_type seed_type; - - typedef typename traits::source_value_type source_value_type; - typedef typename traits::value_type value_type; - - struct reduce_initial_type - { - ~reduce_initial_type() - { - } - reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s) - : source(std::move(o)) - , accumulator(std::move(a)) - , result_selector(std::move(rs)) - , seed(std::move(s)) - { - } - source_type source; - accumulator_type accumulator; - result_selector_type result_selector; - seed_type seed; - - private: - reduce_initial_type& operator=(reduce_initial_type o) RXCPP_DELETE; - }; - reduce_initial_type initial; - - ~reduce() - { - } - reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s) - : initial(std::move(o), std::move(a), std::move(rs), std::move(s)) - { - } - template<class Subscriber> - void on_subscribe(Subscriber o) const { - struct reduce_state_type - : public reduce_initial_type - , public std::enable_shared_from_this<reduce_state_type> - { - reduce_state_type(reduce_initial_type i, Subscriber scrbr) - : reduce_initial_type(i) - , source(i.source) - , current(reduce_initial_type::seed) - , out(std::move(scrbr)) - { - } - source_type source; - seed_type current; - Subscriber out; - - private: - reduce_state_type& operator=(reduce_state_type o) RXCPP_DELETE; - }; - auto state = std::make_shared<reduce_state_type>(initial, std::move(o)); - state->source.subscribe( - state->out, - // on_next - [state](T t) { - seed_type next = state->accumulator(std::move(state->current), std::move(t)); - state->current = std::move(next); - }, - // on_error - [state](rxu::error_ptr e) { - state->out.on_error(e); - }, - // on_completed - [state]() { - auto result = on_exception( - [&](){return state->result_selector(std::move(state->current));}, - state->out); - if (result.empty()) { - return; - } - state->out.on_next(std::move(result.get())); - state->out.on_completed(); - } - ); - } -private: - reduce& operator=(reduce o) RXCPP_DELETE; -}; - -template<class T> -struct initialize_seeder { - typedef T seed_type; - static seed_type seed() { - return seed_type{}; - } -}; - -template<class T> -struct average { - struct seed_type - { - seed_type() - : value() - , count(0) - { - } - rxu::maybe<T> value; - int count; - rxu::detail::maybe<double> stage; - }; - static seed_type seed() { - return seed_type{}; - } - template<class U> - seed_type operator()(seed_type a, U&& v) { - if (a.count != 0 && - (a.count == std::numeric_limits<int>::max() || - ((v > 0) && (*(a.value) > (std::numeric_limits<T>::max() - v))) || - ((v < 0) && (*(a.value) < (std::numeric_limits<T>::min() - v))))) { - // would overflow, calc existing and reset for next batch - // this will add error to the final result, but the alternative - // is to fail on overflow - double avg = static_cast<double>(*(a.value)) / a.count; - if (!a.stage.empty()) { - a.stage.reset((*a.stage + avg) / 2); - } else { - a.stage.reset(avg); - } - a.value.reset(std::forward<U>(v)); - a.count = 1; - } else if (a.value.empty()) { - a.value.reset(std::forward<U>(v)); - a.count = 1; - } else { - *(a.value) += v; - ++a.count; - } - return a; - } - double operator()(seed_type a) { - if (!a.value.empty()) { - double avg = static_cast<double>(*(a.value)) / a.count; - if (!a.stage.empty()) { - avg = (*a.stage + avg) / 2; - } - return avg; - } - rxu::throw_exception(rxcpp::empty_error("average() requires a stream with at least one value")); - } -}; - -template<class T> -struct sum { - typedef rxu::maybe<T> seed_type; - static seed_type seed() { - return seed_type(); - } - template<class U> - seed_type operator()(seed_type a, U&& v) const { - if (a.empty()) - a.reset(std::forward<U>(v)); - else - *a = *a + v; - return a; - } - T operator()(seed_type a) const { - if (a.empty()) - rxu::throw_exception(rxcpp::empty_error("sum() requires a stream with at least one value")); - return *a; - } -}; - -template<class T> -struct max { - typedef rxu::maybe<T> seed_type; - static seed_type seed() { - return seed_type(); - } - template<class U> - seed_type operator()(seed_type a, U&& v) { - if (a.empty() || *a < v) - a.reset(std::forward<U>(v)); - return a; - } - T operator()(seed_type a) { - if (a.empty()) - rxu::throw_exception(rxcpp::empty_error("max() requires a stream with at least one value")); - return *a; - } -}; - -template<class T> -struct min { - typedef rxu::maybe<T> seed_type; - static seed_type seed() { - return seed_type(); - } - template<class U> - seed_type operator()(seed_type a, U&& v) { - if (a.empty() || v < *a) - a.reset(std::forward<U>(v)); - return a; - } - T operator()(seed_type a) { - if (a.empty()) - rxu::throw_exception(rxcpp::empty_error("min() requires a stream with at least one value")); - return *a; - } -}; - -template<class T> -struct first { - using seed_type = rxu::maybe<T>; - static seed_type seed() { - return seed_type(); - } - template<class U> - seed_type operator()(seed_type a, U&& v) { - a.reset(std::forward<U>(v)); - return a; - } - T operator()(seed_type a) { - if (a.empty()) { - rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value")); - } - return *a; - } -}; - -template<class T> -struct last { - using seed_type = rxu::maybe<T>; - static seed_type seed() { - return seed_type(); - } - template<class U> - seed_type operator()(seed_type a, U&& v) { - a.reset(std::forward<U>(v)); - return a; - } - T operator()(seed_type a) { - if (a.empty()) { - rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value")); - } - return *a; - } -}; - -} - -/*! @copydoc rx-reduce.hpp -*/ -template<class... AN> -auto reduce(AN&&... an) - -> operator_factory<reduce_tag, AN...> { - return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); -} - -/*! @copydoc rx-reduce.hpp -*/ -template<class... AN> -auto accumulate(AN&&... an) - -> operator_factory<reduce_tag, AN...> { - return operator_factory<reduce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); -} - -/*! \brief For each item from this observable reduce it by sending only the first item. - - \return An observable that emits only the very first item emitted by the source observable. - - \sample - \snippet math.cpp first sample - \snippet output.txt first sample - - When the source observable calls on_error: - \snippet math.cpp first empty sample - \snippet output.txt first empty sample -*/ -inline auto first() - -> operator_factory<first_tag> { - return operator_factory<first_tag>(std::tuple<>{}); -} - -/*! \brief For each item from this observable reduce it by sending only the last item. - - \return An observable that emits only the very last item emitted by the source observable. - - \sample - \snippet math.cpp last sample - \snippet output.txt last sample - - When the source observable calls on_error: - \snippet math.cpp last empty sample - \snippet output.txt last empty sample -*/ -inline auto last() - -> operator_factory<last_tag> { - return operator_factory<last_tag>(std::tuple<>{}); -} - -/*! \brief For each item from this observable reduce it by incrementing a count. - - \return An observable that emits a single item: the number of elements emitted by the source observable. - - \sample - \snippet math.cpp count sample - \snippet output.txt count sample - - When the source observable calls on_error: - \snippet math.cpp count error sample - \snippet output.txt count error sample -*/ -inline auto count() - -> operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>> { - return operator_factory<reduce_tag, int, rxu::count, rxu::detail::take_at<0>>(std::make_tuple(0, rxu::count(), rxu::take_at<0>())); -} - -/*! \brief For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end. - - \return An observable that emits a single item: the average of elements emitted by the source observable. - - \sample - \snippet math.cpp average sample - \snippet output.txt average sample - - When the source observable completes without emitting any items: - \snippet math.cpp average empty sample - \snippet output.txt average empty sample - - When the source observable calls on_error: - \snippet math.cpp average error sample - \snippet output.txt average error sample -*/ -inline auto average() - -> operator_factory<average_tag> { - return operator_factory<average_tag>(std::tuple<>{}); -} - -/*! \brief For each item from this observable reduce it by adding to the previous items. - - \return An observable that emits a single item: the sum of elements emitted by the source observable. - - \sample - \snippet math.cpp sum sample - \snippet output.txt sum sample - - When the source observable completes without emitting any items: - \snippet math.cpp sum empty sample - \snippet output.txt sum empty sample - - When the source observable calls on_error: - \snippet math.cpp sum error sample - \snippet output.txt sum error sample -*/ -inline auto sum() - -> operator_factory<sum_tag> { - return operator_factory<sum_tag>(std::tuple<>{}); -} - -/*! \brief For each item from this observable reduce it by taking the min value of the previous items. - - \return An observable that emits a single item: the min of elements emitted by the source observable. - - \sample - \snippet math.cpp min sample - \snippet output.txt min sample - - When the source observable completes without emitting any items: - \snippet math.cpp min empty sample - \snippet output.txt min empty sample - - When the source observable calls on_error: - \snippet math.cpp min error sample - \snippet output.txt min error sample -*/ -inline auto min() - -> operator_factory<min_tag> { - return operator_factory<min_tag>(std::tuple<>{}); -} - -/*! \brief For each item from this observable reduce it by taking the max value of the previous items. - - \return An observable that emits a single item: the max of elements emitted by the source observable. - - \sample - \snippet math.cpp max sample - \snippet output.txt max sample - - When the source observable completes without emitting any items: - \snippet math.cpp max empty sample - \snippet output.txt max empty sample - - When the source observable calls on_error: - \snippet math.cpp max error sample - \snippet output.txt max error sample -*/ -inline auto max() - -> operator_factory<max_tag> { - return operator_factory<max_tag>(std::tuple<>{}); -} - -} - -template<> -struct member_overload<reduce_tag> -{ - - template<class Observable, class Seed, class Accumulator, class ResultSelector, - class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, - class Value = rxu::value_type_t<Reduce>, - class Result = observable<Value, Reduce>> - static Result member(Observable&& o, Seed&& s, Accumulator&& a, ResultSelector&& r) - { - return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), std::forward<ResultSelector>(r), std::forward<Seed>(s))); - } - - template<class Observable, class Seed, class Accumulator, - class ResultSelector=rxu::detail::take_at<0>, - class Reduce = rxo::detail::reduce<rxu::value_type_t<Observable>, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, - class Value = rxu::value_type_t<Reduce>, - class Result = observable<Value, Reduce>> - static Result member(Observable&& o, Seed&& s, Accumulator&& a) - { - return Result(Reduce(std::forward<Observable>(o), std::forward<Accumulator>(a), rxu::detail::take_at<0>(), std::forward<Seed>(s))); - } - - template<class... AN> - static operators::detail::reduce_invalid_t<AN...> member(AN...) { - std::terminate(); - return {}; - static_assert(sizeof...(AN) == 10000, "reduce takes (Seed, Accumulator, optional ResultSelector), Accumulator takes (Seed, Observable::value_type) -> Seed, ResultSelector takes (Observable::value_type) -> ResultValue"); - } -}; - -template<> -struct member_overload<first_tag> -{ - template<class Observable, - class SValue = rxu::value_type_t<Observable>, - class Operation = operators::detail::first<SValue>, - class Seed = decltype(Operation::seed()), - class Accumulator = Operation, - class ResultSelector = Operation, - class TakeOne = decltype(((rxu::decay_t<Observable>*)nullptr)->take(1)), - class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<TakeOne>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, - class RValue = rxu::value_type_t<Reduce>, - class Result = observable<RValue, Reduce>> - static Result member(Observable&& o) - { - return Result(Reduce(o.take(1), Operation{}, Operation{}, Operation::seed())); - } - - template<class... AN> - static operators::detail::reduce_invalid_t<AN...> member(AN...) { - std::terminate(); - return {}; - static_assert(sizeof...(AN) == 10000, "first does not support Observable::value_type"); - } -}; - -template<> -struct member_overload<last_tag> -{ - template<class Observable, - class SValue = rxu::value_type_t<Observable>, - class Operation = operators::detail::last<SValue>, - class Seed = decltype(Operation::seed()), - class Accumulator = Operation, - class ResultSelector = Operation, - class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, - class RValue = rxu::value_type_t<Reduce>, - class Result = observable<RValue, Reduce>> - static Result member(Observable&& o) - { - return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed())); - } - - template<class... AN> - static operators::detail::reduce_invalid_t<AN...> member(AN...) { - std::terminate(); - return {}; - static_assert(sizeof...(AN) == 10000, "last does not support Observable::value_type"); - } -}; - -template<> -struct member_overload<sum_tag> -{ - template<class Observable, - class SValue = rxu::value_type_t<Observable>, - class Operation = operators::detail::sum<SValue>, - class Seed = decltype(Operation::seed()), - class Accumulator = Operation, - class ResultSelector = Operation, - class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, - class RValue = rxu::value_type_t<Reduce>, - class Result = observable<RValue, Reduce>> - static Result member(Observable&& o) - { - return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed())); - } - - template<class... AN> - static operators::detail::reduce_invalid_t<AN...> member(AN...) { - std::terminate(); - return {}; - static_assert(sizeof...(AN) == 10000, "sum does not support Observable::value_type"); - } -}; - -template<> -struct member_overload<average_tag> -{ - template<class Observable, - class SValue = rxu::value_type_t<Observable>, - class Operation = operators::detail::average<SValue>, - class Seed = decltype(Operation::seed()), - class Accumulator = Operation, - class ResultSelector = Operation, - class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, - class RValue = rxu::value_type_t<Reduce>, - class Result = observable<RValue, Reduce>> - static Result member(Observable&& o) - { - return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed())); - } - - template<class... AN> - static operators::detail::reduce_invalid_t<AN...> member(AN...) { - std::terminate(); - return {}; - static_assert(sizeof...(AN) == 10000, "average does not support Observable::value_type"); - } -}; - -template<> -struct member_overload<max_tag> -{ - template<class Observable, - class SValue = rxu::value_type_t<Observable>, - class Operation = operators::detail::max<SValue>, - class Seed = decltype(Operation::seed()), - class Accumulator = Operation, - class ResultSelector = Operation, - class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, - class RValue = rxu::value_type_t<Reduce>, - class Result = observable<RValue, Reduce>> - static Result member(Observable&& o) - { - return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed())); - } - - template<class... AN> - static operators::detail::reduce_invalid_t<AN...> member(AN...) { - std::terminate(); - return {}; - static_assert(sizeof...(AN) == 10000, "max does not support Observable::value_type"); - } -}; - -template<> -struct member_overload<min_tag> -{ - template<class Observable, - class SValue = rxu::value_type_t<Observable>, - class Operation = operators::detail::min<SValue>, - class Seed = decltype(Operation::seed()), - class Accumulator = Operation, - class ResultSelector = Operation, - class Reduce = rxo::detail::reduce<SValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<ResultSelector>, rxu::decay_t<Seed>>, - class RValue = rxu::value_type_t<Reduce>, - class Result = observable<RValue, Reduce>> - static Result member(Observable&& o) - { - return Result(Reduce(std::forward<Observable>(o), Operation{}, Operation{}, Operation::seed())); - } - - template<class... AN> - static operators::detail::reduce_invalid_t<AN...> member(AN...) { - std::terminate(); - return {}; - static_assert(sizeof...(AN) == 10000, "min does not support Observable::value_type"); - } -}; - -} - -#endif |