summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp')
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp185
1 files changed, 0 insertions, 185 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
deleted file mode 100644
index 7c93469..0000000
--- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
+++ /dev/null
@@ -1,185 +0,0 @@
-// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
-
-#pragma once
-
-#if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
-#define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP
-
-#include "../rx-includes.hpp"
-
-namespace rxcpp {
-
-namespace schedulers {
-
-typedef std::function<std::thread(std::function<void()>)> thread_factory;
-
-struct new_thread : public scheduler_interface
-{
-private:
- typedef new_thread this_type;
- new_thread(const this_type&);
-
- struct new_worker : public worker_interface
- {
- private:
- typedef new_worker this_type;
-
- typedef detail::action_queue queue_type;
-
- new_worker(const this_type&);
-
- struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
- {
- typedef detail::schedulable_queue<
- typename clock_type::time_point> queue_item_time;
-
- typedef queue_item_time::item_type item_type;
-
- virtual ~new_worker_state()
- {
- // Ensure that std::thread is no longer joinable,
- // otherwise the destructor will call std::terminate.
- if (!worker.joinable()) {
- return;
- }
- if (worker.get_id() != std::this_thread::get_id()) {
- worker.join();
- } else {
- worker.detach();
- }
- }
-
- explicit new_worker_state(composite_subscription cs)
- : lifetime(cs)
- {
- }
-
- composite_subscription lifetime;
- mutable std::mutex lock;
- mutable std::condition_variable wake;
- mutable queue_item_time q;
- std::thread worker;
- recursion r;
- };
-
- std::shared_ptr<new_worker_state> state;
-
- public:
- virtual ~new_worker()
- {
- }
-
- explicit new_worker(std::shared_ptr<new_worker_state> ws)
- : state(ws)
- {
- }
-
- new_worker(composite_subscription cs, thread_factory& tf)
- : state(std::make_shared<new_worker_state>(cs))
- {
- auto keepAlive = state;
-
- state->lifetime.add([keepAlive](){
- std::unique_lock<std::mutex> guard(keepAlive->lock);
- auto expired = std::move(keepAlive->q);
- keepAlive->q = new_worker_state::queue_item_time{};
- if (!keepAlive->q.empty()) std::terminate();
- keepAlive->wake.notify_one();
-
- // ~new_worker_state cleans up the std::thread
- });
-
- state->worker = tf([keepAlive](){
-
- // take ownership
- queue_type::ensure(std::make_shared<new_worker>(keepAlive));
- // release ownership
- RXCPP_UNWIND_AUTO([]{
- queue_type::destroy();
- });
-
- for(;;) {
- std::unique_lock<std::mutex> guard(keepAlive->lock);
- if (keepAlive->q.empty()) {
- keepAlive->wake.wait(guard, [keepAlive](){
- return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
- });
- }
- if (!keepAlive->lifetime.is_subscribed()) {
- break;
- }
- auto& peek = keepAlive->q.top();
- if (!peek.what.is_subscribed()) {
- keepAlive->q.pop();
- continue;
- }
- if (clock_type::now() < peek.when) {
- keepAlive->wake.wait_until(guard, peek.when);
- continue;
- }
- auto what = peek.what;
- keepAlive->q.pop();
- keepAlive->r.reset(keepAlive->q.empty());
- guard.unlock();
- what(keepAlive->r.get_recurse());
- }
- });
- }
-
- virtual clock_type::time_point now() const {
- return clock_type::now();
- }
-
- virtual void schedule(const schedulable& scbl) const {
- schedule(now(), scbl);
- }
-
- virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
- if (scbl.is_subscribed()) {
- std::unique_lock<std::mutex> guard(state->lock);
- state->q.push(new_worker_state::item_type(when, scbl));
- state->r.reset(false);
- }
- state->wake.notify_one();
- }
- };
-
- mutable thread_factory factory;
-
-public:
- new_thread()
- : factory([](std::function<void()> start){
- return std::thread(std::move(start));
- })
- {
- }
- explicit new_thread(thread_factory tf)
- : factory(tf)
- {
- }
- virtual ~new_thread()
- {
- }
-
- virtual clock_type::time_point now() const {
- return clock_type::now();
- }
-
- virtual worker create_worker(composite_subscription cs) const {
- return worker(cs, std::make_shared<new_worker>(cs, factory));
- }
-};
-
-inline scheduler make_new_thread() {
- static scheduler instance = make_scheduler<new_thread>();
- return instance;
-}
-inline scheduler make_new_thread(thread_factory tf) {
- return make_scheduler<new_thread>(tf);
-}
-
-}
-
-}
-
-#endif