aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLarry Safran <lsafran@google.com>2023-08-18 10:16:43 -0700
committerGitHub <noreply@github.com>2023-08-18 10:16:43 -0700
commit55c5040cb5bb0035eb741f20c8989247c505ffe9 (patch)
treead8c571e701bde939ddb5f2396f5ba30b812e9e1
parenteb18cba062d8f87e5647b1d642570b2399ccea8c (diff)
downloadgrpc-grpc-java-55c5040cb5bb0035eb741f20c8989247c505ffe9.tar.gz
Remove ThreadlessExecutor from BlockingServerStream (#10496)
* Remove ThreadlessExecutor from BlockingServerStream fixes #10490
-rw-r--r--stub/src/main/java/io/grpc/stub/ClientCalls.java63
1 files changed, 14 insertions, 49 deletions
diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java
index 74c315058..13fb00d3b 100644
--- a/stub/src/main/java/io/grpc/stub/ClientCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java
@@ -133,9 +133,7 @@ public final class ClientCalls {
public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
try {
return getUnchecked(futureUnaryCall(call, req));
- } catch (RuntimeException e) {
- throw cancelThrow(call, e);
- } catch (Error e) {
+ } catch (RuntimeException | Error e) {
throw cancelThrow(call, e);
}
}
@@ -167,10 +165,7 @@ public final class ClientCalls {
}
executor.shutdown();
return getUnchecked(responseFuture);
- } catch (RuntimeException e) {
- // Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
- throw cancelThrow(call, e);
- } catch (Error e) {
+ } catch (RuntimeException | Error e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
@@ -206,14 +201,12 @@ public final class ClientCalls {
*
* @return an iterator over the response stream.
*/
- // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
- ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method,
- callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
- .withExecutor(executor));
- BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
+ callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING));
+
+ BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}
@@ -288,8 +281,7 @@ public final class ClientCalls {
private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
try {
call.cancel(null, t);
- } catch (Throwable e) {
- assert e instanceof RuntimeException || e instanceof Error;
+ } catch (RuntimeException | Error e) {
logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
}
if (t instanceof RuntimeException) {
@@ -320,9 +312,7 @@ public final class ClientCalls {
try {
call.sendMessage(req);
call.halfClose();
- } catch (RuntimeException e) {
- throw cancelThrow(call, e);
- } catch (Error e) {
+ } catch (RuntimeException | Error e) {
throw cancelThrow(call, e);
}
}
@@ -597,20 +587,12 @@ public final class ClientCalls {
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
private final StartableListener<T> listener = new QueuingListener();
private final ClientCall<?, T> call;
- /** May be null. */
- private final ThreadlessExecutor threadless;
// Only accessed when iterating.
private Object last;
// Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call) {
- this(call, null);
- }
-
- // Non private to avoid synthetic class
- BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) {
this.call = call;
- this.threadless = threadless;
}
StartableListener<T> listener() {
@@ -620,31 +602,14 @@ public final class ClientCalls {
private Object waitForNext() {
boolean interrupt = false;
try {
- if (threadless == null) {
- while (true) {
- try {
- return buffer.take();
- } catch (InterruptedException ie) {
- interrupt = true;
- call.cancel("Thread interrupted", ie);
- // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
- }
- }
- } else {
- Object next;
- while ((next = buffer.poll()) == null) {
- try {
- threadless.waitAndDrain();
- } catch (InterruptedException ie) {
- interrupt = true;
- call.cancel("Thread interrupted", ie);
- // Now wait for onClose() to be called, so interceptors can clean up
- }
- }
- if (next == this || next instanceof StatusRuntimeException) {
- threadless.shutdown();
+ while (true) {
+ try {
+ return buffer.take();
+ } catch (InterruptedException ie) {
+ interrupt = true;
+ call.cancel("Thread interrupted", ie);
+ // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
- return next;
}
} finally {
if (interrupt) {