aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/com/android/volley/RequestQueue.java
blob: 6db0b1cc68c9baaac0c95be5230f9c79575f8d02 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
/*
 * Copyright (C) 2011 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.volley;

import android.os.Handler;
import android.os.Looper;
import androidx.annotation.IntDef;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * A request dispatch queue with a thread pool of dispatchers.
 *
 * <p>Calling {@link #add(Request)} will enqueue the given Request for dispatch, resolving from
 * either cache or network on a worker thread, and then delivering a parsed response on the main
 * thread.
 */
public class RequestQueue {

    /** Callback interface for completed requests. */
    // TODO: This should not be a generic class, because the request type can't be determined at
    // compile time, so all calls to onRequestFinished are unsafe. However, changing this would be
    // an API-breaking change. See also: https://github.com/google/volley/pull/109
    @Deprecated // Use RequestEventListener instead.
    public interface RequestFinishedListener<T> {
        /** Called when a request has finished processing. */
        void onRequestFinished(Request<T> request);
    }

    /** Request event types the listeners {@link RequestEventListener} will be notified about. */
    @Retention(RetentionPolicy.SOURCE)
    @IntDef({
        RequestEvent.REQUEST_QUEUED,
        RequestEvent.REQUEST_CACHE_LOOKUP_STARTED,
        RequestEvent.REQUEST_CACHE_LOOKUP_FINISHED,
        RequestEvent.REQUEST_NETWORK_DISPATCH_STARTED,
        RequestEvent.REQUEST_NETWORK_DISPATCH_FINISHED,
        RequestEvent.REQUEST_FINISHED
    })
    public @interface RequestEvent {
        /** The request was added to the queue. */
        public static final int REQUEST_QUEUED = 0;
        /** Cache lookup started for the request. */
        public static final int REQUEST_CACHE_LOOKUP_STARTED = 1;
        /**
         * Cache lookup finished for the request and cached response is delivered or request is
         * queued for network dispatching.
         */
        public static final int REQUEST_CACHE_LOOKUP_FINISHED = 2;
        /** Network dispatch started for the request. */
        public static final int REQUEST_NETWORK_DISPATCH_STARTED = 3;
        /** The network dispatch finished for the request and response (if any) is delivered. */
        public static final int REQUEST_NETWORK_DISPATCH_FINISHED = 4;
        /**
         * All the work associated with the request is finished and request is removed from all the
         * queues.
         */
        public static final int REQUEST_FINISHED = 5;
    }

    /** Callback interface for request life cycle events. */
    public interface RequestEventListener {
        /**
         * Called on every request lifecycle event. Can be called from different threads. The call
         * is blocking request processing, so any processing should be kept at minimum or moved to
         * another thread.
         */
        void onRequestEvent(Request<?> request, @RequestEvent int event);
    }

    /** Used for generating monotonically-increasing sequence numbers for requests. */
    private final AtomicInteger mSequenceGenerator = new AtomicInteger();

    /**
     * The set of all requests currently being processed by this RequestQueue. A Request will be in
     * this set if it is waiting in any queue or currently being processed by any dispatcher.
     */
    private final Set<Request<?>> mCurrentRequests = new HashSet<>();

    /** The cache triage queue. */
    private final PriorityBlockingQueue<Request<?>> mCacheQueue = new PriorityBlockingQueue<>();

    /** The queue of requests that are actually going out to the network. */
    private final PriorityBlockingQueue<Request<?>> mNetworkQueue = new PriorityBlockingQueue<>();

    /** Number of network request dispatcher threads to start. */
    private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;

    /** Cache interface for retrieving and storing responses. */
    private final Cache mCache;

    /** Network interface for performing requests. */
    private final Network mNetwork;

    /** Response delivery mechanism. */
    private final ResponseDelivery mDelivery;

    /** The network dispatchers. */
    private final NetworkDispatcher[] mDispatchers;

    /** The cache dispatcher. */
    private CacheDispatcher mCacheDispatcher;

    private final List<RequestFinishedListener> mFinishedListeners = new ArrayList<>();

    /** Collection of listeners for request life cycle events. */
    private final List<RequestEventListener> mEventListeners = new ArrayList<>();

    /**
     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
     *
     * @param cache A Cache to use for persisting responses to disk
     * @param network A Network interface for performing HTTP requests
     * @param threadPoolSize Number of network dispatcher threads to create
     * @param delivery A ResponseDelivery interface for posting responses and errors
     */
    public RequestQueue(
            Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery) {
        mCache = cache;
        mNetwork = network;
        mDispatchers = new NetworkDispatcher[threadPoolSize];
        mDelivery = delivery;
    }

    /**
     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
     *
     * @param cache A Cache to use for persisting responses to disk
     * @param network A Network interface for performing HTTP requests
     * @param threadPoolSize Number of network dispatcher threads to create
     */
    public RequestQueue(Cache cache, Network network, int threadPoolSize) {
        this(
                cache,
                network,
                threadPoolSize,
                new ExecutorDelivery(new Handler(Looper.getMainLooper())));
    }

    /**
     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
     *
     * @param cache A Cache to use for persisting responses to disk
     * @param network A Network interface for performing HTTP requests
     */
    public RequestQueue(Cache cache, Network network) {
        this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
    }

    /** Starts the dispatchers in this queue. */
    public void start() {
        stop(); // Make sure any currently running dispatchers are stopped.
        // Create the cache dispatcher and start it.
        mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
        mCacheDispatcher.start();

        // Create network dispatchers (and corresponding threads) up to the pool size.
        for (int i = 0; i < mDispatchers.length; i++) {
            NetworkDispatcher networkDispatcher =
                    new NetworkDispatcher(mNetworkQueue, mNetwork, mCache, mDelivery);
            mDispatchers[i] = networkDispatcher;
            networkDispatcher.start();
        }
    }

    /** Stops the cache and network dispatchers. */
    public void stop() {
        if (mCacheDispatcher != null) {
            mCacheDispatcher.quit();
        }
        for (final NetworkDispatcher mDispatcher : mDispatchers) {
            if (mDispatcher != null) {
                mDispatcher.quit();
            }
        }
    }

    /** Gets a sequence number. */
    public int getSequenceNumber() {
        return mSequenceGenerator.incrementAndGet();
    }

    /** Gets the {@link Cache} instance being used. */
    public Cache getCache() {
        return mCache;
    }

    /**
     * A simple predicate or filter interface for Requests, for use by {@link
     * RequestQueue#cancelAll(RequestFilter)}.
     */
    public interface RequestFilter {
        boolean apply(Request<?> request);
    }

    /**
     * Cancels all requests in this queue for which the given filter applies.
     *
     * @param filter The filtering function to use
     */
    public void cancelAll(RequestFilter filter) {
        synchronized (mCurrentRequests) {
            for (Request<?> request : mCurrentRequests) {
                if (filter.apply(request)) {
                    request.cancel();
                }
            }
        }
    }

    /**
     * Cancels all requests in this queue with the given tag. Tag must be non-null and equality is
     * by identity.
     */
    public void cancelAll(final Object tag) {
        if (tag == null) {
            throw new IllegalArgumentException("Cannot cancelAll with a null tag");
        }
        cancelAll(
                new RequestFilter() {
                    @Override
                    public boolean apply(Request<?> request) {
                        return request.getTag() == tag;
                    }
                });
    }

    /**
     * Adds a Request to the dispatch queue.
     *
     * @param request The request to service
     * @return The passed-in request
     */
    public <T> Request<T> add(Request<T> request) {
        // Tag the request as belonging to this queue and add it to the set of current requests.
        request.setRequestQueue(this);
        synchronized (mCurrentRequests) {
            mCurrentRequests.add(request);
        }

        // Process requests in the order they are added.
        request.setSequence(getSequenceNumber());
        request.addMarker("add-to-queue");
        sendRequestEvent(request, RequestEvent.REQUEST_QUEUED);

        beginRequest(request);
        return request;
    }

    <T> void beginRequest(Request<T> request) {
        // If the request is uncacheable, skip the cache queue and go straight to the network.
        if (!request.shouldCache()) {
            sendRequestOverNetwork(request);
        } else {
            mCacheQueue.add(request);
        }
    }

    /**
     * Called from {@link Request#finish(String)}, indicating that processing of the given request
     * has finished.
     */
    @SuppressWarnings("unchecked") // see above note on RequestFinishedListener
    <T> void finish(Request<T> request) {
        // Remove from the set of requests currently being processed.
        synchronized (mCurrentRequests) {
            mCurrentRequests.remove(request);
        }
        synchronized (mFinishedListeners) {
            for (RequestFinishedListener<T> listener : mFinishedListeners) {
                listener.onRequestFinished(request);
            }
        }
        sendRequestEvent(request, RequestEvent.REQUEST_FINISHED);
    }

    /** Sends a request life cycle event to the listeners. */
    void sendRequestEvent(Request<?> request, @RequestEvent int event) {
        synchronized (mEventListeners) {
            for (RequestEventListener listener : mEventListeners) {
                listener.onRequestEvent(request, event);
            }
        }
    }

    /** Add a listener for request life cycle events. */
    public void addRequestEventListener(RequestEventListener listener) {
        synchronized (mEventListeners) {
            mEventListeners.add(listener);
        }
    }

    /** Remove a listener for request life cycle events. */
    public void removeRequestEventListener(RequestEventListener listener) {
        synchronized (mEventListeners) {
            mEventListeners.remove(listener);
        }
    }

    @Deprecated // Use RequestEventListener instead.
    public <T> void addRequestFinishedListener(RequestFinishedListener<T> listener) {
        synchronized (mFinishedListeners) {
            mFinishedListeners.add(listener);
        }
    }

    /** Remove a RequestFinishedListener. Has no effect if listener was not previously added. */
    @Deprecated // Use RequestEventListener instead.
    public <T> void removeRequestFinishedListener(RequestFinishedListener<T> listener) {
        synchronized (mFinishedListeners) {
            mFinishedListeners.remove(listener);
        }
    }

    public ResponseDelivery getResponseDelivery() {
        return mDelivery;
    }

    <T> void sendRequestOverNetwork(Request<T> request) {
        mNetworkQueue.add(request);
    }
}