diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp | 118 |
1 files changed, 0 insertions, 118 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp b/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp deleted file mode 100644 index 4f83767..0000000 --- a/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -#pragma once - -#if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP) -#define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP - -#include "../rx-includes.hpp" - -namespace rxcpp { - -namespace schedulers { - -struct event_loop : public scheduler_interface -{ -private: - typedef event_loop this_type; - event_loop(const this_type&); - - struct loop_worker : public worker_interface - { - private: - typedef loop_worker this_type; - loop_worker(const this_type&); - - typedef detail::schedulable_queue< - typename clock_type::time_point> queue_item_time; - - typedef queue_item_time::item_type item_type; - - composite_subscription lifetime; - worker controller; - std::shared_ptr<const scheduler_interface> alive; - - public: - virtual ~loop_worker() - { - } - loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive) - : lifetime(cs) - , controller(w) - , alive(alive) - { - auto token = controller.add(cs); - cs.add([token, w](){ - w.remove(token); - }); - } - - virtual clock_type::time_point now() const { - return clock_type::now(); - } - - virtual void schedule(const schedulable& scbl) const { - controller.schedule(lifetime, scbl.get_action()); - } - - virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { - controller.schedule(when, lifetime, scbl.get_action()); - } - }; - - mutable thread_factory factory; - scheduler newthread; - mutable std::atomic<std::size_t> count; - composite_subscription loops_lifetime; - std::vector<worker> loops; - -public: - event_loop() - : factory([](std::function<void()> start){ - return std::thread(std::move(start)); - }) - , newthread(make_new_thread()) - , count(0) - { - auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4)); - while (remaining--) { - loops.push_back(newthread.create_worker(loops_lifetime)); - } - } - explicit event_loop(thread_factory tf) - : factory(tf) - , newthread(make_new_thread(tf)) - , count(0) - { - auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4)); - while (remaining--) { - loops.push_back(newthread.create_worker(loops_lifetime)); - } - } - virtual ~event_loop() - { - loops_lifetime.unsubscribe(); - } - - virtual clock_type::time_point now() const { - return clock_type::now(); - } - - virtual worker create_worker(composite_subscription cs) const { - return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this())); - } -}; - -inline scheduler make_event_loop() { - static scheduler instance = make_scheduler<event_loop>(); - return instance; -} -inline scheduler make_event_loop(thread_factory tf) { - return make_scheduler<event_loop>(tf); -} - -} - -} - -#endif |