diff options
author | Larry Safran <lsafran@google.com> | 2023-08-18 10:16:43 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-18 10:16:43 -0700 |
commit | 55c5040cb5bb0035eb741f20c8989247c505ffe9 (patch) | |
tree | ad8c571e701bde939ddb5f2396f5ba30b812e9e1 | |
parent | eb18cba062d8f87e5647b1d642570b2399ccea8c (diff) | |
download | grpc-grpc-java-55c5040cb5bb0035eb741f20c8989247c505ffe9.tar.gz |
Remove ThreadlessExecutor from BlockingServerStream (#10496)
* Remove ThreadlessExecutor from BlockingServerStream
fixes #10490
-rw-r--r-- | stub/src/main/java/io/grpc/stub/ClientCalls.java | 63 |
1 files changed, 14 insertions, 49 deletions
diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 74c315058..13fb00d3b 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -133,9 +133,7 @@ public final class ClientCalls { public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) { try { return getUnchecked(futureUnaryCall(call, req)); - } catch (RuntimeException e) { - throw cancelThrow(call, e); - } catch (Error e) { + } catch (RuntimeException | Error e) { throw cancelThrow(call, e); } } @@ -167,10 +165,7 @@ public final class ClientCalls { } executor.shutdown(); return getUnchecked(responseFuture); - } catch (RuntimeException e) { - // Something very bad happened. All bets are off; it may be dangerous to wait for onClose(). - throw cancelThrow(call, e); - } catch (Error e) { + } catch (RuntimeException | Error e) { // Something very bad happened. All bets are off; it may be dangerous to wait for onClose(). throw cancelThrow(call, e); } finally { @@ -206,14 +201,12 @@ public final class ClientCalls { * * @return an iterator over the response stream. */ - // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs. public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall( Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) { - ThreadlessExecutor executor = new ThreadlessExecutor(); ClientCall<ReqT, RespT> call = channel.newCall(method, - callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING) - .withExecutor(executor)); - BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor); + callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)); + + BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call); asyncUnaryRequestCall(call, req, result.listener()); return result; } @@ -288,8 +281,7 @@ public final class ClientCalls { private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) { try { call.cancel(null, t); - } catch (Throwable e) { - assert e instanceof RuntimeException || e instanceof Error; + } catch (RuntimeException | Error e) { logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e); } if (t instanceof RuntimeException) { @@ -320,9 +312,7 @@ public final class ClientCalls { try { call.sendMessage(req); call.halfClose(); - } catch (RuntimeException e) { - throw cancelThrow(call, e); - } catch (Error e) { + } catch (RuntimeException | Error e) { throw cancelThrow(call, e); } } @@ -597,20 +587,12 @@ public final class ClientCalls { private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3); private final StartableListener<T> listener = new QueuingListener(); private final ClientCall<?, T> call; - /** May be null. */ - private final ThreadlessExecutor threadless; // Only accessed when iterating. private Object last; // Non private to avoid synthetic class BlockingResponseStream(ClientCall<?, T> call) { - this(call, null); - } - - // Non private to avoid synthetic class - BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) { this.call = call; - this.threadless = threadless; } StartableListener<T> listener() { @@ -620,31 +602,14 @@ public final class ClientCalls { private Object waitForNext() { boolean interrupt = false; try { - if (threadless == null) { - while (true) { - try { - return buffer.take(); - } catch (InterruptedException ie) { - interrupt = true; - call.cancel("Thread interrupted", ie); - // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill - } - } - } else { - Object next; - while ((next = buffer.poll()) == null) { - try { - threadless.waitAndDrain(); - } catch (InterruptedException ie) { - interrupt = true; - call.cancel("Thread interrupted", ie); - // Now wait for onClose() to be called, so interceptors can clean up - } - } - if (next == this || next instanceof StatusRuntimeException) { - threadless.shutdown(); + while (true) { + try { + return buffer.take(); + } catch (InterruptedException ie) { + interrupt = true; + call.cancel("Thread interrupted", ie); + // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill } - return next; } } finally { if (interrupt) { |