diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp | 185 |
1 files changed, 0 insertions, 185 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp deleted file mode 100644 index 7c93469..0000000 --- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -#pragma once - -#if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP) -#define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP - -#include "../rx-includes.hpp" - -namespace rxcpp { - -namespace schedulers { - -typedef std::function<std::thread(std::function<void()>)> thread_factory; - -struct new_thread : public scheduler_interface -{ -private: - typedef new_thread this_type; - new_thread(const this_type&); - - struct new_worker : public worker_interface - { - private: - typedef new_worker this_type; - - typedef detail::action_queue queue_type; - - new_worker(const this_type&); - - struct new_worker_state : public std::enable_shared_from_this<new_worker_state> - { - typedef detail::schedulable_queue< - typename clock_type::time_point> queue_item_time; - - typedef queue_item_time::item_type item_type; - - virtual ~new_worker_state() - { - // Ensure that std::thread is no longer joinable, - // otherwise the destructor will call std::terminate. - if (!worker.joinable()) { - return; - } - if (worker.get_id() != std::this_thread::get_id()) { - worker.join(); - } else { - worker.detach(); - } - } - - explicit new_worker_state(composite_subscription cs) - : lifetime(cs) - { - } - - composite_subscription lifetime; - mutable std::mutex lock; - mutable std::condition_variable wake; - mutable queue_item_time q; - std::thread worker; - recursion r; - }; - - std::shared_ptr<new_worker_state> state; - - public: - virtual ~new_worker() - { - } - - explicit new_worker(std::shared_ptr<new_worker_state> ws) - : state(ws) - { - } - - new_worker(composite_subscription cs, thread_factory& tf) - : state(std::make_shared<new_worker_state>(cs)) - { - auto keepAlive = state; - - state->lifetime.add([keepAlive](){ - std::unique_lock<std::mutex> guard(keepAlive->lock); - auto expired = std::move(keepAlive->q); - keepAlive->q = new_worker_state::queue_item_time{}; - if (!keepAlive->q.empty()) std::terminate(); - keepAlive->wake.notify_one(); - - // ~new_worker_state cleans up the std::thread - }); - - state->worker = tf([keepAlive](){ - - // take ownership - queue_type::ensure(std::make_shared<new_worker>(keepAlive)); - // release ownership - RXCPP_UNWIND_AUTO([]{ - queue_type::destroy(); - }); - - for(;;) { - std::unique_lock<std::mutex> guard(keepAlive->lock); - if (keepAlive->q.empty()) { - keepAlive->wake.wait(guard, [keepAlive](){ - return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty(); - }); - } - if (!keepAlive->lifetime.is_subscribed()) { - break; - } - auto& peek = keepAlive->q.top(); - if (!peek.what.is_subscribed()) { - keepAlive->q.pop(); - continue; - } - if (clock_type::now() < peek.when) { - keepAlive->wake.wait_until(guard, peek.when); - continue; - } - auto what = peek.what; - keepAlive->q.pop(); - keepAlive->r.reset(keepAlive->q.empty()); - guard.unlock(); - what(keepAlive->r.get_recurse()); - } - }); - } - - virtual clock_type::time_point now() const { - return clock_type::now(); - } - - virtual void schedule(const schedulable& scbl) const { - schedule(now(), scbl); - } - - virtual void schedule(clock_type::time_point when, const schedulable& scbl) const { - if (scbl.is_subscribed()) { - std::unique_lock<std::mutex> guard(state->lock); - state->q.push(new_worker_state::item_type(when, scbl)); - state->r.reset(false); - } - state->wake.notify_one(); - } - }; - - mutable thread_factory factory; - -public: - new_thread() - : factory([](std::function<void()> start){ - return std::thread(std::move(start)); - }) - { - } - explicit new_thread(thread_factory tf) - : factory(tf) - { - } - virtual ~new_thread() - { - } - - virtual clock_type::time_point now() const { - return clock_type::now(); - } - - virtual worker create_worker(composite_subscription cs) const { - return worker(cs, std::make_shared<new_worker>(cs, factory)); - } -}; - -inline scheduler make_new_thread() { - static scheduler instance = make_scheduler<new_thread>(); - return instance; -} -inline scheduler make_new_thread(thread_factory tf) { - return make_scheduler<new_thread>(tf); -} - -} - -} - -#endif |