diff options
author | jiayl@webrtc.org <jiayl@webrtc.org> | 2014-09-24 17:14:05 +0000 |
---|---|---|
committer | jiayl@webrtc.org <jiayl@webrtc.org> | 2014-09-24 17:14:05 +0000 |
commit | 8fd0eda9a5709127accccba383b258faf3503d2b (patch) | |
tree | cc486400dcc8805feb88a354d3c3644b474fe506 | |
parent | 85b5766ba62b2ffdbe0acd8222653dc1cb32c7ae (diff) | |
download | webrtc-8fd0eda9a5709127accccba383b258faf3503d2b.tar.gz |
Fix a problem in Thread::Send.
Previously if thread A->Send is called on thread B, B->ReceiveSends will be called, which enables an arbitrary thread to invoke calls on B while B is wait for A->Send to return. This caused mutliple problems like issue 3559, 3579.
The fix is to limit B->ReceiveSends to only process requests from A.
Also disallow the worker thread invoking other threads.
BUG=3559
R=juberti@webrtc.org
Review URL: https://webrtc-codereview.appspot.com/15089004
git-svn-id: http://webrtc.googlecode.com/svn/trunk/webrtc@7290 4adac7df-926f-26a2-2b94-8c16560cd09d
-rw-r--r-- | base/thread.cc | 33 | ||||
-rw-r--r-- | base/thread.h | 19 | ||||
-rw-r--r-- | base/thread_unittest.cc | 72 |
3 files changed, 112 insertions, 12 deletions
diff --git a/base/thread.cc b/base/thread.cc index 9d2917d9..40257ab8 100644 --- a/base/thread.cc +++ b/base/thread.cc @@ -411,15 +411,12 @@ void Thread::Stop() { } void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { - AssertBlockingIsAllowedOnCurrentThread(); - if (fStop_) return; // Sent messages are sent to the MessageHandler directly, in the context // of "thread", like Win32 SendMessage. If in the right context, // call the handler directly. - Message msg; msg.phandler = phandler; msg.message_id = id; @@ -429,6 +426,8 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { return; } + AssertBlockingIsAllowedOnCurrentThread(); + AutoThread thread; Thread *current_thread = Thread::Current(); ASSERT(current_thread != NULL); // AutoThread ensures this @@ -451,7 +450,9 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { crit_.Enter(); while (!ready) { crit_.Leave(); - current_thread->ReceiveSends(); + // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary + // thread invoking calls on the current thread. + current_thread->ReceiveSendsFromThread(this); current_thread->socketserver()->Wait(kForever, false); waited = true; crit_.Enter(); @@ -475,17 +476,23 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { } void Thread::ReceiveSends() { + ReceiveSendsFromThread(NULL); +} + +void Thread::ReceiveSendsFromThread(const Thread* source) { // Receive a sent message. Cleanup scenarios: // - thread sending exits: We don't allow this, since thread can exit // only via Join, so Send must complete. // - thread receiving exits: Wakeup/set ready in Thread::Clear() // - object target cleared: Wakeup/set ready in Thread::Clear() + _SendMessage smsg; + crit_.Enter(); - while (!sendlist_.empty()) { - _SendMessage smsg = sendlist_.front(); - sendlist_.pop_front(); + while (PopSendMessageFromThread(source, &smsg)) { crit_.Leave(); + smsg.msg.phandler->OnMessage(&smsg.msg); + crit_.Enter(); *smsg.ready = true; smsg.thread->socketserver()->WakeUp(); @@ -493,6 +500,18 @@ void Thread::ReceiveSends() { crit_.Leave(); } +bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) { + for (std::list<_SendMessage>::iterator it = sendlist_.begin(); + it != sendlist_.end(); ++it) { + if (it->thread == source || source == NULL) { + *msg = *it; + sendlist_.erase(it); + return true; + } + } + return false; +} + void Thread::Clear(MessageHandler *phandler, uint32 id, MessageList* removed) { CritScope cs(&crit_); diff --git a/base/thread.h b/base/thread.h index 25b0f569..34ec45e3 100644 --- a/base/thread.h +++ b/base/thread.h @@ -165,7 +165,6 @@ class Thread : public MessageQueue { // See ScopedDisallowBlockingCalls for details. template <class ReturnT, class FunctorT> ReturnT Invoke(const FunctorT& functor) { - AssertBlockingIsAllowedOnCurrentThread(); FunctorMessageHandler<ReturnT, FunctorT> handler(functor); Send(&handler); return handler.result(); @@ -210,6 +209,10 @@ class Thread : public MessageQueue { // of whatever code is conditionally executing because of the return value! bool RunningForTest() { return running(); } + // Sets the per-thread allow-blocking-calls flag and returns the previous + // value. + bool SetAllowBlockingCalls(bool allow); + protected: // This method should be called when thread is created using non standard // method, like derived implementation of rtc::Thread and it can not be @@ -226,10 +229,6 @@ class Thread : public MessageQueue { // Blocks the calling thread until this thread has terminated. void Join(); - // Sets the per-thread allow-blocking-calls flag and returns the previous - // value. - bool SetAllowBlockingCalls(bool allow); - static void AssertBlockingIsAllowedOnCurrentThread(); friend class ScopedDisallowBlockingCalls; @@ -248,6 +247,16 @@ class Thread : public MessageQueue { // Return true if the thread was started and hasn't yet stopped. bool running() { return running_.Wait(0); } + // Processes received "Send" requests. If |source| is not NULL, only requests + // from |source| are processed, otherwise, all requests are processed. + void ReceiveSendsFromThread(const Thread* source); + + // If |source| is not NULL, pops the first "Send" message from |source| in + // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|. + // The caller must lock |crit_| before calling. + // Returns true if there is such a message. + bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg); + std::list<_SendMessage> sendlist_; std::string name_; ThreadPriority priority_; diff --git a/base/thread_unittest.cc b/base/thread_unittest.cc index 4229df28..57b6df66 100644 --- a/base/thread_unittest.cc +++ b/base/thread_unittest.cc @@ -276,6 +276,78 @@ TEST(ThreadTest, DISABLED_ON_MAC(Invoke)) { thread.Invoke<void>(&LocalFuncs::Func2); } +// Verifies that two threads calling Invoke on each other at the same time does +// not deadlock. +TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) { + AutoThread thread; + Thread* current_thread = Thread::Current(); + ASSERT_TRUE(current_thread != NULL); + + Thread other_thread; + other_thread.Start(); + + struct LocalFuncs { + static void Set(bool* out) { *out = true; } + static void InvokeSet(Thread* thread, bool* out) { + thread->Invoke<void>(Bind(&Set, out)); + } + }; + + bool called = false; + other_thread.Invoke<void>( + Bind(&LocalFuncs::InvokeSet, current_thread, &called)); + + EXPECT_TRUE(called); +} + +// Verifies that if thread A invokes a call on thread B and thread C is trying +// to invoke A at the same time, thread A does not handle C's invoke while +// invoking B. +TEST(ThreadTest, ThreeThreadsInvoke) { + AutoThread thread; + Thread* thread_a = Thread::Current(); + Thread thread_b, thread_c; + thread_b.Start(); + thread_c.Start(); + + struct LocalFuncs { + static void Set(bool* out) { *out = true; } + static void InvokeSet(Thread* thread, bool* out) { + thread->Invoke<void>(Bind(&Set, out)); + } + + // Set |out| true and call InvokeSet on |thread|. + static void SetAndInvokeSet(bool* out, Thread* thread, bool* out_inner) { + *out = true; + InvokeSet(thread, out_inner); + } + + // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until + // |thread1| starts the call. + static void AsyncInvokeSetAndWait( + Thread* thread1, Thread* thread2, bool* out) { + bool async_invoked = false; + + AsyncInvoker invoker; + invoker.AsyncInvoke<void>( + thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out)); + + EXPECT_TRUE_WAIT(async_invoked, 2000); + } + }; + + bool thread_a_called = false; + + // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A. + // Thread B returns when C receives the call and C should be blocked until A + // starts to process messages. + thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait, + &thread_c, thread_a, &thread_a_called)); + EXPECT_FALSE(thread_a_called); + + EXPECT_TRUE_WAIT(thread_a_called, 2000); +} + class AsyncInvokeTest : public testing::Test { public: void IntCallback(int value) { |