summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-publish.hpp
blob: bc686fcfefab9ad8b810e66732e1aee22840905f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

/*! \file rx-publish.hpp

    \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
           Turn a cold observable hot, send the most recent value to any new subscriber, and allow connections to the source to be independent of subscriptions.

    \tparam  T  the type of the emitted item (optional).

    \param  first  an initial item to be emitted by the resulting observable at connection time before emitting the items from the source observable; not emitted to observers that subscribe after the time of connection (optional).
    \param  cs  the subscription to control lifetime (optional).

    \return  rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers.

    \sample
    \snippet publish.cpp publish subject sample
    \snippet output.txt publish subject sample

    \sample
    \snippet publish.cpp publish behavior sample
    \snippet output.txt publish behavior sample

    \sample
    \snippet publish.cpp publish diamond samethread sample
    \snippet output.txt publish diamond samethread sample

    \sample
    \snippet publish.cpp publish diamond bgthread sample
    \snippet output.txt publish diamond bgthread sample

    \sample
    \snippet ref_count.cpp ref_count other diamond sample
    \snippet output.txt ref_count other diamond sample
*/

#if !defined(RXCPP_OPERATORS_RX_PUBLISH_HPP)
#define RXCPP_OPERATORS_RX_PUBLISH_HPP

#include "../rx-includes.hpp"
#include "./rx-multicast.hpp"

namespace rxcpp {

namespace operators {

namespace detail {

template<class... AN>
struct publish_invalid_arguments {};

template<class... AN>
struct publish_invalid : public rxo::operator_base<publish_invalid_arguments<AN...>> {
    using type = observable<publish_invalid_arguments<AN...>, publish_invalid<AN...>>;
};
template<class... AN>
using publish_invalid_t = typename publish_invalid<AN...>::type;

}

/*! @copydoc rx-publish.hpp
*/
template<class... AN>
auto publish(AN&&... an)
    ->      operator_factory<publish_tag, AN...> {
     return operator_factory<publish_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}

/*! \brief Turn a cold observable hot and allow connections to the source to be independent of subscriptions.

    \tparam  Coordination  the type of the scheduler.

    \param  cn  a scheduler all values are queued and delivered on.
    \param  cs  the subscription to control lifetime (optional).

    \return  rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler.

    \sample
    \snippet publish.cpp publish_synchronized sample
    \snippet output.txt publish_synchronized sample
*/
template<class... AN>
auto publish_synchronized(AN&&... an)
    ->      operator_factory<publish_synchronized_tag, AN...> {
     return operator_factory<publish_synchronized_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}

}

template<>
struct member_overload<publish_tag>
{
    template<class Observable,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class Subject = rxsub::subject<SourceValue>,
        class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
        class Result = connectable_observable<SourceValue, Multicast>
        >
    static Result member(Observable&& o) {
        return Result(Multicast(std::forward<Observable>(o), Subject(composite_subscription())));
    }

    template<class Observable,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class Subject = rxsub::subject<SourceValue>,
        class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
        class Result = connectable_observable<SourceValue, Multicast>
        >
    static Result member(Observable&& o, composite_subscription cs) {
        return Result(Multicast(std::forward<Observable>(o), Subject(cs)));
    }

    template<class Observable, class T,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class Subject = rxsub::behavior<SourceValue>,
        class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
        class Result = connectable_observable<SourceValue, Multicast>
        >
    static Result member(Observable&& o, T first, composite_subscription cs = composite_subscription()) {
        return Result(Multicast(std::forward<Observable>(o), Subject(first, cs)));
    }

    template<class... AN>
    static operators::detail::publish_invalid_t<AN...> member(AN...) {
        std::terminate();
        return {};
        static_assert(sizeof...(AN) == 10000, "publish takes (optional CompositeSubscription) or (T, optional CompositeSubscription)");
    }
};

template<>
struct member_overload<publish_synchronized_tag>
{
    template<class Observable, class Coordination,
        class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>,
            is_coordination<Coordination>>,
        class SourceValue = rxu::value_type_t<Observable>,
        class Subject = rxsub::synchronize<SourceValue, rxu::decay_t<Coordination>>,
        class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
        class Result = connectable_observable<SourceValue, Multicast>
        >
    static Result member(Observable&& o, Coordination&& cn, composite_subscription cs = composite_subscription()) {
        return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Coordination>(cn), cs)));
    }

    template<class... AN>
    static operators::detail::publish_invalid_t<AN...> member(AN...) {
        std::terminate();
        return {};
        static_assert(sizeof...(AN) == 10000, "publish_synchronized takes (Coordination, optional CompositeSubscription)");
    }
};

}

#endif