aboutsummaryrefslogtreecommitdiff
path: root/avatar
diff options
context:
space:
mode:
authoruael <uael@google.com>2023-08-02 17:26:47 +0000
committerLucas Abel <22837557+uael@users.noreply.github.com>2023-08-02 11:01:01 -0700
commit1a948edc2fe8994274e29ce066cb14d5d68aab6d (patch)
tree23e34130b2910e29496c59d8cdc06ff5df14ea88 /avatar
parentbd11b55945ec904e0246c5941b88db1e70bb611a (diff)
downloadavatar-1a948edc2fe8994274e29ce066cb14d5d68aab6d.tar.gz
metrics: workaround gRPC bug https://github.com/grpc/grpc/pull/33951
This need to be reverted as soon as the bug fix is present in `grpcio` pypi release.
Diffstat (limited to 'avatar')
-rw-r--r--avatar/metrics/interceptors.py24
1 files changed, 20 insertions, 4 deletions
diff --git a/avatar/metrics/interceptors.py b/avatar/metrics/interceptors.py
index 31a194d..e853395 100644
--- a/avatar/metrics/interceptors.py
+++ b/avatar/metrics/interceptors.py
@@ -15,8 +15,10 @@
"""Avatar metrics interceptors."""
import grpc
+import time
from avatar.metrics.trace import Callsite
+from grpc.aio import ClientCallDetails
from pandora import _utils as utils
from typing import (
TYPE_CHECKING,
@@ -58,10 +60,6 @@ def aio_interceptors(device: PandoraClient) -> Sequence[grpc.aio.ClientIntercept
return [AioUnaryUnaryInterceptor(device), AioUnaryStreamInterceptor(device), AioStreamStreamInterceptor(device)]
-class ClientCallDetails(Protocol):
- method: Union[bytes, str]
-
-
class UnaryOutcome(Protocol, Generic[_T_co]):
def result(self) -> _T_co:
...
@@ -195,6 +193,15 @@ class AioUnaryStreamInterceptor(grpc.aio.UnaryStreamClientInterceptor): # type:
client_call_details: ClientCallDetails,
request: _T,
) -> utils.AioStream[_U]:
+
+ # TODO: this is a workaround for https://github.com/grpc/grpc/pull/33951
+ # need to be deleted as soon as `grpcio` contains the fix.
+ now = time.time()
+ if client_call_details.timeout and client_call_details.timeout > now:
+ client_call_details = client_call_details._replace(
+ timeout=client_call_details.timeout - now,
+ )
+
callsite = Callsite(self.device, client_call_details.method, request)
call = await continuation(client_call_details, request)
call.add_done_callback(lambda _: callsite.end(None)) # type: ignore
@@ -234,6 +241,15 @@ class AioStreamStreamInterceptor(grpc.aio.StreamStreamClientInterceptor): # typ
client_call_details: ClientCallDetails,
request: utils.AioSender[_T],
) -> utils.AioStreamStream[_T, _U]:
+
+ # TODO: this is a workaround for https://github.com/grpc/grpc/pull/33951
+ # need to be deleted as soon as `grpcio` contains the fix.
+ now = time.time()
+ if client_call_details.timeout and client_call_details.timeout > now:
+ client_call_details = client_call_details._replace(
+ timeout=client_call_details.timeout - now,
+ )
+
callsite = Callsite(self.device, client_call_details.method, None)
class RequestProxy: