aboutsummaryrefslogtreecommitdiff
path: root/guava/src/com/google/common/util/concurrent/InterruptibleTask.java
blob: 6f33c5032da503c981fee81db18ea32fa36a95ee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
/*
 * Copyright (C) 2015 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.util.concurrent.NullnessCasts.uncheckedCastNullableTToT;
import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException;

import com.google.common.annotations.GwtCompatible;
import com.google.common.annotations.VisibleForTesting;
import com.google.j2objc.annotations.ReflectionSupport;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.AbstractOwnableSynchronizer;
import java.util.concurrent.locks.LockSupport;
import org.checkerframework.checker.nullness.qual.Nullable;

@GwtCompatible(emulated = true)
@ReflectionSupport(value = ReflectionSupport.Level.FULL)
@ElementTypesAreNonnullByDefault
// Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause
// getDeclaredField to throw a NoSuchFieldException when the field is definitely there.
// Since this class only needs CAS on one field, we can avoid this bug by extending AtomicReference
// instead of using an AtomicReferenceFieldUpdater. This reference stores Thread instances
// and DONE/INTERRUPTED - they have a common ancestor of Runnable.
abstract class InterruptibleTask<T extends @Nullable Object>
    extends AtomicReference<@Nullable Runnable> implements Runnable {
  static {
    // Prevent rare disastrous classloading in first call to LockSupport.park.
    // See: https://bugs.openjdk.java.net/browse/JDK-8074773
    @SuppressWarnings("unused")
    Class<?> ensureLoaded = LockSupport.class;
  }

  private static final class DoNothingRunnable implements Runnable {
    @Override
    public void run() {}
  }
  // The thread executing the task publishes itself to the superclass' reference and the thread
  // interrupting sets DONE when it has finished interrupting.
  private static final Runnable DONE = new DoNothingRunnable();
  private static final Runnable PARKED = new DoNothingRunnable();
  // Why 1000?  WHY NOT!
  private static final int MAX_BUSY_WAIT_SPINS = 1000;

  @SuppressWarnings("ThreadPriorityCheck") // The cow told me to
  @Override
  public final void run() {
    /*
     * Set runner thread before checking isDone(). If we were to check isDone() first, the task
     * might be cancelled before we set the runner thread. That would make it impossible to
     * interrupt, yet it will still run, since interruptTask will leave the runner value null,
     * allowing the CAS below to succeed.
     */
    Thread currentThread = Thread.currentThread();
    if (!compareAndSet(null, currentThread)) {
      return; // someone else has run or is running.
    }

    boolean run = !isDone();
    T result = null;
    Throwable error = null;
    try {
      if (run) {
        result = runInterruptibly();
      }
    } catch (Throwable t) {
      restoreInterruptIfIsInterruptedException(t);
      error = t;
    } finally {
      // Attempt to set the task as done so that further attempts to interrupt will fail.
      if (!compareAndSet(currentThread, DONE)) {
        waitForInterrupt(currentThread);
      }
      if (run) {
        if (error == null) {
          // The cast is safe because of the `run` and `error` checks.
          afterRanInterruptiblySuccess(uncheckedCastNullableTToT(result));
        } else {
          afterRanInterruptiblyFailure(error);
        }
      }
    }
  }

  private void waitForInterrupt(Thread currentThread) {
    /*
     * If someone called cancel(true), it is possible that the interrupted bit hasn't been set yet.
     * Wait for the interrupting thread to set DONE. (See interruptTask().) We want to wait so that
     * the interrupting thread doesn't interrupt the _next_ thing to run on this thread.
     *
     * Note: We don't reset the interrupted bit, just wait for it to be set. If this is a thread
     * pool thread, the thread pool will reset it for us. Otherwise, the interrupted bit may have
     * been intended for something else, so don't clear it.
     */
    boolean restoreInterruptedBit = false;
    int spinCount = 0;
    // Interrupting Cow Says:
    //  ______
    // < Spin >
    //  ------
    //        \   ^__^
    //         \  (oo)\_______
    //            (__)\       )\/\
    //                ||----w |
    //                ||     ||
    Runnable state = get();
    Blocker blocker = null;
    while (state instanceof Blocker || state == PARKED) {
      if (state instanceof Blocker) {
        blocker = (Blocker) state;
      }
      spinCount++;
      if (spinCount > MAX_BUSY_WAIT_SPINS) {
        /*
         * If we have spun a lot, just park ourselves. This will save CPU while we wait for a slow
         * interrupting thread. In theory, interruptTask() should be very fast, but due to
         * InterruptibleChannel and JavaLangAccess.blockedOn(Thread, Interruptible), it isn't
         * predictable what work might be done. (e.g., close a file and flush buffers to disk). To
         * protect ourselves from this, we park ourselves and tell our interrupter that we did so.
         */
        if (state == PARKED || compareAndSet(state, PARKED)) {
          // Interrupting Cow Says:
          //  ______
          // < Park >
          //  ------
          //        \   ^__^
          //         \  (oo)\_______
          //            (__)\       )\/\
          //                ||----w |
          //                ||     ||
          // We need to clear the interrupted bit prior to calling park and maintain it in case we
          // wake up spuriously.
          restoreInterruptedBit = Thread.interrupted() || restoreInterruptedBit;
          LockSupport.park(blocker);
        }
      } else {
        Thread.yield();
      }
      state = get();
    }
    if (restoreInterruptedBit) {
      currentThread.interrupt();
    }
    /*
     * TODO(cpovirk): Clear interrupt status here? We currently don't, which means that an interrupt
     * before, during, or after runInterruptibly() (unless it produced an InterruptedException
     * caught above) can linger and affect listeners.
     */
  }

  /**
   * Called before runInterruptibly - if true, runInterruptibly and afterRanInterruptibly will not
   * be called.
   */
  abstract boolean isDone();

  /**
   * Do interruptible work here - do not complete Futures here, as their listeners could be
   * interrupted.
   */
  @ParametricNullness
  abstract T runInterruptibly() throws Exception;

  /**
   * Any interruption that happens as a result of calling interruptTask will arrive before this
   * method is called. Complete Futures here.
   */
  abstract void afterRanInterruptiblySuccess(@ParametricNullness T result);

  /**
   * Any interruption that happens as a result of calling interruptTask will arrive before this
   * method is called. Complete Futures here.
   */
  abstract void afterRanInterruptiblyFailure(Throwable error);

  /**
   * Interrupts the running task. Because this internally calls {@link Thread#interrupt()} which can
   * in turn invoke arbitrary code it is not safe to call while holding a lock.
   */
  final void interruptTask() {
    // Since the Thread is replaced by DONE before run() invokes listeners or returns, if we succeed
    // in this CAS, there's no risk of interrupting the wrong thread or interrupting a thread that
    // isn't currently executing this task.
    Runnable currentRunner = get();
    if (currentRunner instanceof Thread) {
      Blocker blocker = new Blocker(this);
      blocker.setOwner(Thread.currentThread());
      if (compareAndSet(currentRunner, blocker)) {
        // Thread.interrupt can throw arbitrary exceptions due to the nio InterruptibleChannel API
        // This will make sure that tasks don't get stuck busy waiting.
        // Some of this is fixed in jdk11 (see https://bugs.openjdk.java.net/browse/JDK-8198692) but
        // not all.  See the test cases for examples on how this can happen.
        try {
          ((Thread) currentRunner).interrupt();
        } finally {
          Runnable prev = getAndSet(DONE);
          if (prev == PARKED) {
            LockSupport.unpark((Thread) currentRunner);
          }
        }
      }
    }
  }

  /**
   * Using this as the blocker object allows introspection and debugging tools to see that the
   * currentRunner thread is blocked on the progress of the interruptor thread, which can help
   * identify deadlocks.
   */
  @VisibleForTesting
  static final class Blocker extends AbstractOwnableSynchronizer implements Runnable {
    private final InterruptibleTask<?> task;

    private Blocker(InterruptibleTask<?> task) {
      this.task = task;
    }

    @Override
    public void run() {}

    private void setOwner(Thread thread) {
      super.setExclusiveOwnerThread(thread);
    }

    @VisibleForTesting
    Thread getOwner() {
      return super.getExclusiveOwnerThread();
    }

    @Override
    public String toString() {
      return task.toString();
    }
  }

  @Override
  public final String toString() {
    Runnable state = get();
    String result;
    if (state == DONE) {
      result = "running=[DONE]";
    } else if (state instanceof Blocker) {
      result = "running=[INTERRUPTED]";
    } else if (state instanceof Thread) {
      // getName is final on Thread, no need to worry about exceptions
      result = "running=[RUNNING ON " + ((Thread) state).getName() + "]";
    } else {
      result = "running=[NOT STARTED YET]";
    }
    return result + ", " + toPendingString();
  }

  abstract String toPendingString();
}