aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc/java/main/dev/pigweed/pw_rpc/FutureCall.java
blob: c6953487f07c18eb453c44d29e720f98ffa39cab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Copyright 2022 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

package dev.pigweed.pw_rpc;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.MessageLite;
import dev.pigweed.pw_log.Logger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;

/**
 * Call implementation that represents the call as a ListenableFuture.
 *
 * This class suppresses ShouldNotSubclass warnings from ListenableFuture. It implements
 * ListenableFuture only because it cannot extend AbstractFuture since multiple inheritance is not
 * supported. No Future funtionality is duplicated; FutureCall uses SettableFuture internally.
 */
@SuppressWarnings("ShouldNotSubclass")
abstract class FutureCall<RequestT extends MessageLite, ResponseT extends MessageLite, ResultT>
    extends AbstractCall<RequestT, ResponseT> implements ListenableFuture<ResultT> {
  private static final Logger logger = Logger.forClass(FutureCall.class);

  private final SettableFuture<ResultT> future = SettableFuture.create();

  private FutureCall(Endpoint endpoint, PendingRpc rpc) {
    super(endpoint, rpc);
  }

  // Implement the ListenableFuture interface by forwarding the internal SettableFuture.

  @Override
  public final void addListener(Runnable runnable, Executor executor) {
    future.addListener(runnable, executor);
  }

  /** Cancellation means that a cancel() or cancel(boolean) call succeeded. */
  @Override
  public final boolean isCancelled() {
    return error() == Status.CANCELLED;
  }

  @Override
  public final boolean isDone() {
    return future.isDone();
  }

  @Override
  public final ResultT get() throws InterruptedException, ExecutionException {
    return future.get();
  }

  @Override
  public final ResultT get(long timeout, TimeUnit unit)
      throws InterruptedException, ExecutionException, TimeoutException {
    return future.get(timeout, unit);
  }

  @Override
  public final boolean cancel(boolean mayInterruptIfRunning) {
    try {
      return this.cancel();
    } catch (ChannelOutputException e) {
      logger.atWarning().withCause(e).log("Failed to send cancellation packet for %s", rpc());
      return true; // handleError() was already called, so the future was cancelled
    }
  }

  /** Used by derived classes to access the future instance. */
  final SettableFuture<ResultT> future() {
    return future;
  }

  @Override
  void handleExceptionOnInitialPacket(ChannelOutputException e) {
    // Stash the exception in the future and abort the call.
    future.setException(e);

    // Set the status to mark the call completed. doHandleError() will have no effect since the
    // exception was already set.
    handleError(Status.ABORTED);
  }

  @Override
  public void doHandleError() {
    future.setException(new RpcError(rpc(), error()));
  }

  /** Future-based Call class for unary and client streaming RPCs. */
  static class UnaryResponseFuture<RequestT extends MessageLite, ResponseT extends MessageLite>
      extends FutureCall<RequestT, ResponseT, UnaryResult<ResponseT>>
      implements ClientStreamingFuture<RequestT, ResponseT> {
    @Nullable ResponseT response = null;

    UnaryResponseFuture(Endpoint endpoint, PendingRpc rpc) {
      super(endpoint, rpc);
    }

    @Override
    public void doHandleNext(ResponseT value) {
      if (response == null) {
        response = value;
      } else {
        future().setException(new IllegalStateException("Unary RPC received multiple responses."));
      }
    }

    @Override
    public void doHandleCompleted() {
      if (response == null) {
        future().setException(
            new IllegalStateException("Unary RPC completed without a response payload"));
      } else {
        future().set(UnaryResult.create(response, status()));
      }
    }
  }

  /** Future-based Call class for server and bidirectional streaming RPCs. */
  static class StreamResponseFuture<RequestT extends MessageLite, ResponseT extends MessageLite>
      extends FutureCall<RequestT, ResponseT, Status>
      implements BidirectionalStreamingFuture<RequestT> {
    private final Consumer<ResponseT> onNext;

    static <RequestT extends MessageLite, ResponseT extends MessageLite>
        BiFunction<Endpoint, PendingRpc, StreamResponseFuture<RequestT, ResponseT>> getFactory(
            Consumer<ResponseT> onNext) {
      return (rpcManager, pendingRpc) -> new StreamResponseFuture<>(rpcManager, pendingRpc, onNext);
    }

    private StreamResponseFuture(Endpoint endpoint, PendingRpc rpc, Consumer<ResponseT> onNext) {
      super(endpoint, rpc);
      this.onNext = onNext;
    }

    @Override
    public void doHandleNext(ResponseT value) {
      onNext.accept(value);
    }

    @Override
    public void doHandleCompleted() {
      future().set(status());
    }
  }
}