summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjiayl@webrtc.org <jiayl@webrtc.org>2014-09-24 17:14:05 +0000
committerjiayl@webrtc.org <jiayl@webrtc.org>2014-09-24 17:14:05 +0000
commit8fd0eda9a5709127accccba383b258faf3503d2b (patch)
treecc486400dcc8805feb88a354d3c3644b474fe506
parent85b5766ba62b2ffdbe0acd8222653dc1cb32c7ae (diff)
downloadwebrtc-8fd0eda9a5709127accccba383b258faf3503d2b.tar.gz
Fix a problem in Thread::Send.
Previously if thread A->Send is called on thread B, B->ReceiveSends will be called, which enables an arbitrary thread to invoke calls on B while B is wait for A->Send to return. This caused mutliple problems like issue 3559, 3579. The fix is to limit B->ReceiveSends to only process requests from A. Also disallow the worker thread invoking other threads. BUG=3559 R=juberti@webrtc.org Review URL: https://webrtc-codereview.appspot.com/15089004 git-svn-id: http://webrtc.googlecode.com/svn/trunk/webrtc@7290 4adac7df-926f-26a2-2b94-8c16560cd09d
-rw-r--r--base/thread.cc33
-rw-r--r--base/thread.h19
-rw-r--r--base/thread_unittest.cc72
3 files changed, 112 insertions, 12 deletions
diff --git a/base/thread.cc b/base/thread.cc
index 9d2917d9..40257ab8 100644
--- a/base/thread.cc
+++ b/base/thread.cc
@@ -411,15 +411,12 @@ void Thread::Stop() {
}
void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
- AssertBlockingIsAllowedOnCurrentThread();
-
if (fStop_)
return;
// Sent messages are sent to the MessageHandler directly, in the context
// of "thread", like Win32 SendMessage. If in the right context,
// call the handler directly.
-
Message msg;
msg.phandler = phandler;
msg.message_id = id;
@@ -429,6 +426,8 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
return;
}
+ AssertBlockingIsAllowedOnCurrentThread();
+
AutoThread thread;
Thread *current_thread = Thread::Current();
ASSERT(current_thread != NULL); // AutoThread ensures this
@@ -451,7 +450,9 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
crit_.Enter();
while (!ready) {
crit_.Leave();
- current_thread->ReceiveSends();
+ // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
+ // thread invoking calls on the current thread.
+ current_thread->ReceiveSendsFromThread(this);
current_thread->socketserver()->Wait(kForever, false);
waited = true;
crit_.Enter();
@@ -475,17 +476,23 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
}
void Thread::ReceiveSends() {
+ ReceiveSendsFromThread(NULL);
+}
+
+void Thread::ReceiveSendsFromThread(const Thread* source) {
// Receive a sent message. Cleanup scenarios:
// - thread sending exits: We don't allow this, since thread can exit
// only via Join, so Send must complete.
// - thread receiving exits: Wakeup/set ready in Thread::Clear()
// - object target cleared: Wakeup/set ready in Thread::Clear()
+ _SendMessage smsg;
+
crit_.Enter();
- while (!sendlist_.empty()) {
- _SendMessage smsg = sendlist_.front();
- sendlist_.pop_front();
+ while (PopSendMessageFromThread(source, &smsg)) {
crit_.Leave();
+
smsg.msg.phandler->OnMessage(&smsg.msg);
+
crit_.Enter();
*smsg.ready = true;
smsg.thread->socketserver()->WakeUp();
@@ -493,6 +500,18 @@ void Thread::ReceiveSends() {
crit_.Leave();
}
+bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
+ for (std::list<_SendMessage>::iterator it = sendlist_.begin();
+ it != sendlist_.end(); ++it) {
+ if (it->thread == source || source == NULL) {
+ *msg = *it;
+ sendlist_.erase(it);
+ return true;
+ }
+ }
+ return false;
+}
+
void Thread::Clear(MessageHandler *phandler, uint32 id,
MessageList* removed) {
CritScope cs(&crit_);
diff --git a/base/thread.h b/base/thread.h
index 25b0f569..34ec45e3 100644
--- a/base/thread.h
+++ b/base/thread.h
@@ -165,7 +165,6 @@ class Thread : public MessageQueue {
// See ScopedDisallowBlockingCalls for details.
template <class ReturnT, class FunctorT>
ReturnT Invoke(const FunctorT& functor) {
- AssertBlockingIsAllowedOnCurrentThread();
FunctorMessageHandler<ReturnT, FunctorT> handler(functor);
Send(&handler);
return handler.result();
@@ -210,6 +209,10 @@ class Thread : public MessageQueue {
// of whatever code is conditionally executing because of the return value!
bool RunningForTest() { return running(); }
+ // Sets the per-thread allow-blocking-calls flag and returns the previous
+ // value.
+ bool SetAllowBlockingCalls(bool allow);
+
protected:
// This method should be called when thread is created using non standard
// method, like derived implementation of rtc::Thread and it can not be
@@ -226,10 +229,6 @@ class Thread : public MessageQueue {
// Blocks the calling thread until this thread has terminated.
void Join();
- // Sets the per-thread allow-blocking-calls flag and returns the previous
- // value.
- bool SetAllowBlockingCalls(bool allow);
-
static void AssertBlockingIsAllowedOnCurrentThread();
friend class ScopedDisallowBlockingCalls;
@@ -248,6 +247,16 @@ class Thread : public MessageQueue {
// Return true if the thread was started and hasn't yet stopped.
bool running() { return running_.Wait(0); }
+ // Processes received "Send" requests. If |source| is not NULL, only requests
+ // from |source| are processed, otherwise, all requests are processed.
+ void ReceiveSendsFromThread(const Thread* source);
+
+ // If |source| is not NULL, pops the first "Send" message from |source| in
+ // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
+ // The caller must lock |crit_| before calling.
+ // Returns true if there is such a message.
+ bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
+
std::list<_SendMessage> sendlist_;
std::string name_;
ThreadPriority priority_;
diff --git a/base/thread_unittest.cc b/base/thread_unittest.cc
index 4229df28..57b6df66 100644
--- a/base/thread_unittest.cc
+++ b/base/thread_unittest.cc
@@ -276,6 +276,78 @@ TEST(ThreadTest, DISABLED_ON_MAC(Invoke)) {
thread.Invoke<void>(&LocalFuncs::Func2);
}
+// Verifies that two threads calling Invoke on each other at the same time does
+// not deadlock.
+TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
+ AutoThread thread;
+ Thread* current_thread = Thread::Current();
+ ASSERT_TRUE(current_thread != NULL);
+
+ Thread other_thread;
+ other_thread.Start();
+
+ struct LocalFuncs {
+ static void Set(bool* out) { *out = true; }
+ static void InvokeSet(Thread* thread, bool* out) {
+ thread->Invoke<void>(Bind(&Set, out));
+ }
+ };
+
+ bool called = false;
+ other_thread.Invoke<void>(
+ Bind(&LocalFuncs::InvokeSet, current_thread, &called));
+
+ EXPECT_TRUE(called);
+}
+
+// Verifies that if thread A invokes a call on thread B and thread C is trying
+// to invoke A at the same time, thread A does not handle C's invoke while
+// invoking B.
+TEST(ThreadTest, ThreeThreadsInvoke) {
+ AutoThread thread;
+ Thread* thread_a = Thread::Current();
+ Thread thread_b, thread_c;
+ thread_b.Start();
+ thread_c.Start();
+
+ struct LocalFuncs {
+ static void Set(bool* out) { *out = true; }
+ static void InvokeSet(Thread* thread, bool* out) {
+ thread->Invoke<void>(Bind(&Set, out));
+ }
+
+ // Set |out| true and call InvokeSet on |thread|.
+ static void SetAndInvokeSet(bool* out, Thread* thread, bool* out_inner) {
+ *out = true;
+ InvokeSet(thread, out_inner);
+ }
+
+ // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until
+ // |thread1| starts the call.
+ static void AsyncInvokeSetAndWait(
+ Thread* thread1, Thread* thread2, bool* out) {
+ bool async_invoked = false;
+
+ AsyncInvoker invoker;
+ invoker.AsyncInvoke<void>(
+ thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out));
+
+ EXPECT_TRUE_WAIT(async_invoked, 2000);
+ }
+ };
+
+ bool thread_a_called = false;
+
+ // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
+ // Thread B returns when C receives the call and C should be blocked until A
+ // starts to process messages.
+ thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait,
+ &thread_c, thread_a, &thread_a_called));
+ EXPECT_FALSE(thread_a_called);
+
+ EXPECT_TRUE_WAIT(thread_a_called, 2000);
+}
+
class AsyncInvokeTest : public testing::Test {
public:
void IntCallback(int value) {