summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp
blob: 923cf5dcb43c135f95be04731515a8d4b39ce3bf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

/*! \file rx-timestamp.hpp

    \brief Returns an observable that attaches a timestamp to each item emitted by the source observable indicating when it was emitted.

    \tparam Coordination  the type of the scheduler (optional).

    \param coordination  the scheduler to manage timeout for each event (optional).

    \return  Observable that emits a pair: { item emitted by the source observable, time_point representing the current value of the clock }.

    \sample
    \snippet timestamp.cpp timestamp sample
    \snippet output.txt timestamp sample
*/

#if !defined(RXCPP_OPERATORS_RX_TIMESTAMP_HPP)
#define RXCPP_OPERATORS_RX_TIMESTAMP_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace operators {

namespace detail {

template<class... AN>
struct timestamp_invalid_arguments {};

template<class... AN>
struct timestamp_invalid : public rxo::operator_base<timestamp_invalid_arguments<AN...>> {
    using type = observable<timestamp_invalid_arguments<AN...>, timestamp_invalid<AN...>>;
};
template<class... AN>
using timestamp_invalid_t = typename timestamp_invalid<AN...>::type;

template<class T, class Coordination>
struct timestamp
{
    typedef rxu::decay_t<T> source_value_type;
    typedef rxu::decay_t<Coordination> coordination_type;

    struct timestamp_values {
        timestamp_values(coordination_type c)
            : coordination(c)
        {
        }

        coordination_type coordination;
    };
    timestamp_values initial;

    timestamp(coordination_type coordination)
        : initial(coordination)
    {
    }

    template<class Subscriber>
    struct timestamp_observer
    {
        typedef timestamp_observer<Subscriber> this_type;
        typedef source_value_type value_type;
        typedef rxu::decay_t<Subscriber> dest_type;
        typedef observer<value_type, this_type> observer_type;
        dest_type dest;
        coordination_type coord;

        timestamp_observer(dest_type d, coordination_type coordination)
            : dest(std::move(d)),
              coord(std::move(coordination))
        {
        }

        void on_next(source_value_type v) const {
            dest.on_next(std::make_pair(v, coord.now()));
        }
        void on_error(rxu::error_ptr e) const {
            dest.on_error(e);
        }
        void on_completed() const {
            dest.on_completed();
        }

        static subscriber<value_type, observer_type> make(dest_type d, timestamp_values v) {
            return make_subscriber<value_type>(d, this_type(d, v.coordination));
        }
    };

    template<class Subscriber>
    auto operator()(Subscriber dest) const
        -> decltype(timestamp_observer<Subscriber>::make(std::move(dest), initial)) {
        return      timestamp_observer<Subscriber>::make(std::move(dest), initial);
    }
};

}

/*! @copydoc rx-timestamp.hpp
*/
template<class... AN>
auto timestamp(AN&&... an)
    ->      operator_factory<timestamp_tag, AN...> {
     return operator_factory<timestamp_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}

}

template<>
struct member_overload<timestamp_tag>
{
    template<class Observable,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class Timestamp = rxo::detail::timestamp<SourceValue, identity_one_worker>,
        class Clock = typename rxsc::scheduler::clock_type::time_point,
        class Value = std::pair<SourceValue, Clock>>
    static auto member(Observable&& o)
        -> decltype(o.template lift<Value>(Timestamp(identity_current_thread()))) {
        return      o.template lift<Value>(Timestamp(identity_current_thread()));
    }

    template<class Observable, class Coordination,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>,
            is_coordination<Coordination>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class Timestamp = rxo::detail::timestamp<SourceValue, rxu::decay_t<Coordination>>,
        class Clock = typename rxsc::scheduler::clock_type::time_point,
        class Value = std::pair<SourceValue, Clock>>
    static auto member(Observable&& o, Coordination&& cn)
        -> decltype(o.template lift<Value>(Timestamp(std::forward<Coordination>(cn)))) {
        return      o.template lift<Value>(Timestamp(std::forward<Coordination>(cn)));
    }

    template<class... AN>
    static operators::detail::timestamp_invalid_t<AN...> member(AN...) {
        std::terminate();
        return {};
        static_assert(sizeof...(AN) == 10000, "timestamp takes (optional Coordination)");
    }
};

}

#endif