diff options
Diffstat (limited to 'pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java')
-rw-r--r-- | pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java | 870 |
1 files changed, 515 insertions, 355 deletions
diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java index 3a61b362a..898f0e9b0 100644 --- a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java +++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java @@ -92,11 +92,11 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); - assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY)); + assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(1)); receiveReadServerError(Status.FAILED_PRECONDITION); - assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY)); + assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(1)); receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) .setOffset(0) @@ -112,7 +112,7 @@ public final class TransferClientTest { TransferParameters params = TransferParameters.create(50, 50, 0); ListenableFuture<byte[]> future = transferClient.read(1, params); - assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY, params)); + assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(1, params)); receiveReadChunks(legacyDataChunk(1, TEST_DATA_100B, 0, 50)); @@ -140,7 +140,7 @@ public final class TransferClientTest { receiveReadServerError(Status.FAILED_PRECONDITION); } - Chunk initialChunk = initialReadChunk(1, ProtocolVersion.LEGACY); + Chunk initialChunk = initialLegacyReadChunk(1); assertThat(lastChunks()) .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); @@ -207,7 +207,7 @@ public final class TransferClientTest { TransferParameters params = TransferParameters.create(3, 2, 1); ListenableFuture<byte[]> future = transferClient.read(99, params); - assertThat(lastChunks()).containsExactly(initialReadChunk(99, ProtocolVersion.LEGACY, params)); + assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(99, params)); assertThat(future.cancel(true)).isTrue(); } @@ -216,7 +216,7 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); - assertThat(lastChunks()).containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY)); + assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(123)); receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123) .setOffset(0) @@ -291,7 +291,7 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); - assertThat(lastChunks()).containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY)); + assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(123)); receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(30, 50))); @@ -448,9 +448,8 @@ public final class TransferClientTest { // read should have retried sending the transfer parameters 2 times, for a total of 3 assertThat(lastChunks()) - .containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY), - initialReadChunk(123, ProtocolVersion.LEGACY), - initialReadChunk(123, ProtocolVersion.LEGACY)); + .containsExactly( + initialLegacyReadChunk(123), initialLegacyReadChunk(123), initialLegacyReadChunk(123)); } @Test @@ -491,13 +490,11 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); + assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(2, TEST_DATA_SHORT.size())); receiveWriteServerError(Status.FAILED_PRECONDITION); - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); + assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(2, TEST_DATA_SHORT.size())); receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) .setOffset(0) @@ -519,7 +516,7 @@ public final class TransferClientTest { .setMaxChunkSizeBytes(50)); assertThat(lastChunks()) - .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), + .containsExactly(initialLegacyWriteChunk(2, TEST_DATA_100B.size()), legacyDataChunk(2, TEST_DATA_100B, 0, 50)); receiveWriteServerError(Status.FAILED_PRECONDITION); @@ -538,7 +535,7 @@ public final class TransferClientTest { receiveWriteServerError(Status.FAILED_PRECONDITION); } - Chunk initialChunk = initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()); + Chunk initialChunk = initialLegacyWriteChunk(2, TEST_DATA_SHORT.size()); assertThat(lastChunks()) .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); @@ -559,7 +556,7 @@ public final class TransferClientTest { receiveWriteChunks(legacyFinalChunk(2, Status.OK)); - assertThat(lastChunks()).containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, 0)); + assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(2, 0)); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -569,8 +566,7 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size())); + assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(123, TEST_DATA_100B.size())); receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) @@ -615,8 +611,7 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size())); + assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(123, TEST_DATA_100B.size())); receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) @@ -664,8 +659,7 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY); ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size())); + assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(123, TEST_DATA_100B.size())); receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) .setOffset(0) @@ -734,7 +728,7 @@ public final class TransferClientTest { .setMaxChunkSizeBytes(25)); assertThat(lastChunks()) - .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), + .containsExactly(initialLegacyWriteChunk(123, TEST_DATA_100B.size()), newLegacyChunk(Chunk.Type.DATA, 123).setOffset(100).setRemainingBytes(0).build()); assertThat(future.isDone()).isFalse(); } @@ -800,7 +794,7 @@ public final class TransferClientTest { .setMaxChunkSizeBytes(25)); assertThat(lastChunks()) - .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), + .containsExactly(initialLegacyWriteChunk(123, TEST_DATA_100B.size()), legacyFinalChunk(123, Status.OUT_OF_RANGE)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); @@ -874,10 +868,9 @@ public final class TransferClientTest { // Client should have resent the last chunk (the initial chunk in this case) for each timeout. assertThat(lastChunks()) - .containsExactly( - initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // initial - initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // retry 1 - initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); // retry 2 + .containsExactly(initialLegacyWriteChunk(123, TEST_DATA_SHORT.size()), // initial + initialLegacyWriteChunk(123, TEST_DATA_SHORT.size()), // retry 1 + initialLegacyWriteChunk(123, TEST_DATA_SHORT.size())); // retry 2 } @Test @@ -901,8 +894,7 @@ public final class TransferClientTest { .setRemainingBytes(0) .build(); assertThat(lastChunks()) - .containsExactly( - initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // initial + .containsExactly(initialLegacyWriteChunk(123, TEST_DATA_SHORT.size()), // initial data, // data chunk data, // retry 1 data); // retry 2 @@ -944,7 +936,7 @@ public final class TransferClientTest { assertThat(lastChunks()) .containsExactly( // initial - initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()), + initialLegacyWriteChunk(123, TEST_DATA_100B.size()), // after 2, receive parameters: 40 from 0 by 20 legacyDataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 legacyDataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 @@ -966,29 +958,33 @@ public final class TransferClientTest { public void read_singleChunk_successful() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(3, TRANSFER_PARAMETERS); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); assertThat(future.isDone()).isFalse(); - assertThat(lastChunks()).containsExactly(initialReadChunk(3, ProtocolVersion.VERSION_TWO)); + assertThat(lastChunks()).containsExactly(initialReadChunk(transfer)); - receiveReadChunks(newChunk(Chunk.Type.START_ACK, 321) + receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) .setResourceId(3) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); - assertThat(lastChunks()).containsExactly(readStartAckConfirmation(321, TRANSFER_PARAMETERS)); + assertThat(lastChunks()) + .containsExactly(readStartAckConfirmation(transfer.getSessionId(), TRANSFER_PARAMETERS)); - receiveReadChunks( - newChunk(Chunk.Type.DATA, 321).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); assertThat(lastChunks()) .containsExactly(Chunk.newBuilder() .setType(Chunk.Type.COMPLETION) - .setSessionId(321) + .setSessionId(transfer.getSessionId()) .setStatus(Status.OK.ordinal()) .build()); assertThat(future.isDone()).isFalse(); - receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, 321)); + receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId())); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } @@ -997,9 +993,10 @@ public final class TransferClientTest { public void read_requestV2ReceiveLegacy() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); assertThat(future.isDone()).isFalse(); - assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO)); + assertThat(lastChunks()).containsExactly(initialReadChunk(transfer)); receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1) .setOffset(0) @@ -1016,19 +1013,21 @@ public final class TransferClientTest { public void read_failedPreconditionError_retriesInitialPacket() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO)); + assertThat(lastChunks()).containsExactly(initialReadChunk(transfer)); for (int i = 0; i < MAX_RETRIES; ++i) { receiveReadServerError(Status.FAILED_PRECONDITION); - assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO)); + assertThat(lastChunks()).containsExactly(initialReadChunk(transfer)); } - receiveReadChunks(newChunk(Chunk.Type.START_ACK, 54321) + receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) .setResourceId(1) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); - assertThat(lastChunks()).containsExactly(readStartAckConfirmation(54321, TRANSFER_PARAMETERS)); + assertThat(lastChunks()) + .containsExactly(readStartAckConfirmation(transfer.getSessionId(), TRANSFER_PARAMETERS)); } @Test @@ -1036,11 +1035,11 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); TransferParameters params = TransferParameters.create(50, 50, 0); ListenableFuture<byte[]> future = transferClient.read(1, params); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - assertThat(lastChunks()) - .containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO, params)); + assertThat(lastChunks()).containsExactly(initialReadChunk(transfer)); - receiveReadChunks(newChunk(Chunk.Type.START_ACK, 555) + receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) .setResourceId(1) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); @@ -1056,20 +1055,21 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); TransferParameters params = TransferParameters.create(50, 50, 0); ListenableFuture<byte[]> future = transferClient.read(1, params); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - assertThat(lastChunks()) - .containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO, params)); + assertThat(lastChunks()).containsExactly(initialReadChunk(transfer)); - receiveReadChunks(newChunk(Chunk.Type.START_ACK, 555) + receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) .setResourceId(1) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); - assertThat(lastChunks()).containsExactly(readStartAckConfirmation(555, params)); + assertThat(lastChunks()) + .containsExactly(readStartAckConfirmation(transfer.getSessionId(), params)); - receiveReadChunks(dataChunk(555, TEST_DATA_100B, 0, 50)); + receiveReadChunks(dataChunk(transfer.getSessionId(), TEST_DATA_100B, 0, 50)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 555) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(50) .setWindowEndOffset(100) .setMaxChunkSizeBytes(50) @@ -1086,12 +1086,13 @@ public final class TransferClientTest { public void read_failedPreconditionErrorMaxRetriesTimes_aborts() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); for (int i = 0; i < MAX_RETRIES; ++i) { receiveReadServerError(Status.FAILED_PRECONDITION); } - Chunk initialChunk = initialReadChunk(1, ProtocolVersion.VERSION_TWO); + Chunk initialChunk = initialReadChunk(transfer); assertThat(lastChunks()) .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); @@ -1108,23 +1109,38 @@ public final class TransferClientTest { public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(1); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); assertThat(future.isDone()).isFalse(); - performReadStartHandshake(1, 99); + performReadStartHandshake(transfer); - receiveReadChunks(finalChunk(2, Status.OK), - newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), - newChunk(Chunk.Type.DATA, 3).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); - receiveWriteChunks(finalChunk(99, Status.INVALID_ARGUMENT), - newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0), - newChunk(Chunk.Type.DATA, 2).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0)); + receiveReadChunks(finalChunk(transfer.getSessionId() + 1, Status.OK), + newChunk(Chunk.Type.DATA, transfer.getSessionId() + 2) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0), + newChunk(Chunk.Type.DATA, transfer.getSessionId() + 3) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0)); + receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.INVALID_ARGUMENT), + newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0), + newChunk(Chunk.Type.DATA, transfer.getSessionId() + 1) + .setOffset(0) + .setData(TEST_DATA_100B) + .setRemainingBytes(0)); assertThat(future.isDone()).isFalse(); - receiveReadChunks( - newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(0) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0)); - performReadCompletionHandshake(99, Status.OK); + performReadCompletionHandshake(transfer.getSessionId(), Status.OK); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } @@ -1133,10 +1149,11 @@ public final class TransferClientTest { public void read_empty() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(2); - performReadStartHandshake(2, 5678); - receiveReadChunks(newChunk(Chunk.Type.DATA, 5678).setRemainingBytes(0)); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); + performReadStartHandshake(transfer); + receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId()).setRemainingBytes(0)); - performReadCompletionHandshake(5678, Status.OK); + performReadCompletionHandshake(transfer.getSessionId(), Status.OK); assertThat(future.get()).isEqualTo(new byte[] {}); } @@ -1146,9 +1163,9 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); TransferParameters params = TransferParameters.create(3, 2, 1); ListenableFuture<byte[]> future = transferClient.read(99, params); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - assertThat(lastChunks()) - .containsExactly(initialReadChunk(99, ProtocolVersion.VERSION_TWO, params)); + assertThat(lastChunks()).containsExactly(initialReadChunk(transfer)); assertThat(future.cancel(true)).isTrue(); } @@ -1156,33 +1173,39 @@ public final class TransferClientTest { public void read_severalChunks() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(7, TRANSFER_PARAMETERS); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - performReadStartHandshake(7, 123, TRANSFER_PARAMETERS); + performReadStartHandshake(transfer); - receiveReadChunks( - newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20)).setRemainingBytes(70), - newChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 40))); + receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(0) + .setData(range(0, 20)) + .setRemainingBytes(70), + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(20).setData(range(20, 40))); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId()) .setOffset(40) .setMaxChunkSizeBytes(30) .setWindowEndOffset(90) .build()); - receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(40).setData(range(40, 70))); + receiveReadChunks( + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(40).setData(range(40, 70))); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId()) .setOffset(70) .setMaxChunkSizeBytes(30) .setWindowEndOffset(120) .build()); - receiveReadChunks( - newChunk(Chunk.Type.DATA, 123).setOffset(70).setData(range(70, 100)).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(70) + .setData(range(70, 100)) + .setRemainingBytes(0)); - performReadCompletionHandshake(123, Status.OK); + performReadCompletionHandshake(transfer.getSessionId(), Status.OK); assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); } @@ -1192,113 +1215,115 @@ public final class TransferClientTest { createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); TransferParameters params = TransferParameters.create(50, 10, 0); + final int id = transferClient.getNextSessionIdForTest(); + // Handshake enqueueReadChunks(2, // Wait for read RPC open & START packet - newChunk(Chunk.Type.START_ACK, 99) + newChunk(Chunk.Type.START_ACK, id) .setResourceId(7) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); enqueueReadChunks(1, // Ignore the first START_ACK_CONFIRMATION - newChunk(Chunk.Type.START_ACK, 99) + newChunk(Chunk.Type.START_ACK, id) .setResourceId(7) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); // Window 1: server waits for START_ACK_CONFIRMATION, drops 2nd packet enqueueReadChunks(1, - newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(range(0, 10)), - newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)), - newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)), - newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50))); + newChunk(Chunk.Type.DATA, id).setOffset(0).setData(range(0, 10)), + newChunk(Chunk.Type.DATA, id).setOffset(20).setData(range(20, 30)), + newChunk(Chunk.Type.DATA, id).setOffset(30).setData(range(30, 40)), + newChunk(Chunk.Type.DATA, id).setOffset(40).setData(range(40, 50))); // Window 2: server waits for retransmit, drops 1st packet enqueueReadChunks(1, - newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)), - newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)), - newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50)), - newChunk(Chunk.Type.DATA, 99).setOffset(50).setData(range(50, 60))); + newChunk(Chunk.Type.DATA, id).setOffset(20).setData(range(20, 30)), + newChunk(Chunk.Type.DATA, id).setOffset(30).setData(range(30, 40)), + newChunk(Chunk.Type.DATA, id).setOffset(40).setData(range(40, 50)), + newChunk(Chunk.Type.DATA, id).setOffset(50).setData(range(50, 60))); // Window 3: server waits for retransmit, drops last packet enqueueReadChunks(1, - newChunk(Chunk.Type.DATA, 99).setOffset(10).setData(range(10, 20)), - newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)), - newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)), - newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50))); + newChunk(Chunk.Type.DATA, id).setOffset(10).setData(range(10, 20)), + newChunk(Chunk.Type.DATA, id).setOffset(20).setData(range(20, 30)), + newChunk(Chunk.Type.DATA, id).setOffset(30).setData(range(30, 40)), + newChunk(Chunk.Type.DATA, id).setOffset(40).setData(range(40, 50))); // Window 4: server waits for continue and retransmit, normal window. enqueueReadChunks(2, - newChunk(Chunk.Type.DATA, 99).setOffset(50).setData(range(50, 60)), - newChunk(Chunk.Type.DATA, 99).setOffset(60).setData(range(60, 70)), - newChunk(Chunk.Type.DATA, 99).setOffset(70).setData(range(70, 80)), - newChunk(Chunk.Type.DATA, 99).setOffset(80).setData(range(80, 90)), - newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100))); + newChunk(Chunk.Type.DATA, id).setOffset(50).setData(range(50, 60)), + newChunk(Chunk.Type.DATA, id).setOffset(60).setData(range(60, 70)), + newChunk(Chunk.Type.DATA, id).setOffset(70).setData(range(70, 80)), + newChunk(Chunk.Type.DATA, id).setOffset(80).setData(range(80, 90)), + newChunk(Chunk.Type.DATA, id).setOffset(90).setData(range(90, 100))); enqueueReadChunks(2, // Ignore continue and retransmit chunks, retry last packet in window - newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100)), - newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100))); + newChunk(Chunk.Type.DATA, id).setOffset(90).setData(range(90, 100)), + newChunk(Chunk.Type.DATA, id).setOffset(90).setData(range(90, 100))); // Window 5: Final packet enqueueReadChunks(2, // Receive two retries, then send final packet - newChunk(Chunk.Type.DATA, 99).setOffset(100).setData(range(100, 110)).setRemainingBytes(0)); + newChunk(Chunk.Type.DATA, id).setOffset(100).setData(range(100, 110)).setRemainingBytes(0)); enqueueReadChunks(1, // Ignore first COMPLETION packet - newChunk(Chunk.Type.DATA, 99).setOffset(100).setData(range(100, 110)).setRemainingBytes(0)); - enqueueReadChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, 99)); + newChunk(Chunk.Type.DATA, id).setOffset(100).setData(range(100, 110)).setRemainingBytes(0)); + enqueueReadChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, id)); ListenableFuture<byte[]> future = transferClient.read(7, params); - // assertThat(future.get()).isEqualTo(range(0, 110).toByteArray()); - while (!future.isDone()) { - } + ReadTransfer transfer = transferClient.getReadTransferForTest(future); + assertThat(transfer.getSessionId()).isEqualTo(id); + assertThat(future.get()).isEqualTo(range(0, 110).toByteArray()); assertThat(lastChunks()) .containsExactly( // Handshake - initialReadChunk(7, ProtocolVersion.VERSION_TWO, params), - readStartAckConfirmation(99, params), - readStartAckConfirmation(99, params), + initialReadChunk(transfer), + readStartAckConfirmation(id, params), + readStartAckConfirmation(id, params), // Window 1: send one transfer parameters update after the drop - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) .setOffset(10) .setWindowEndOffset(60) .setMaxChunkSizeBytes(10) .build(), // Window 2: send one transfer parameters update after the drop - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) .setOffset(10) .setWindowEndOffset(60) .setMaxChunkSizeBytes(10) .build(), // Window 3: send one transfer parameters update after the drop, then continue packet - newChunk(Chunk.Type.PARAMETERS_CONTINUE, 99) // Not seen by server + newChunk(Chunk.Type.PARAMETERS_CONTINUE, id) // Not seen by server .setOffset(40) .setWindowEndOffset(90) .setMaxChunkSizeBytes(10) .build(), - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent after timeout + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // Sent after timeout .setOffset(50) .setWindowEndOffset(100) .setMaxChunkSizeBytes(10) .build(), // Window 4: send one transfer parameters update after the drop, then continue packet - newChunk(Chunk.Type.PARAMETERS_CONTINUE, 99) // Ignored by server + newChunk(Chunk.Type.PARAMETERS_CONTINUE, id) // Ignored by server .setOffset(80) .setWindowEndOffset(130) .setMaxChunkSizeBytes(10) .build(), - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent after last packet + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // Sent after last packet .setOffset(100) .setWindowEndOffset(150) .setMaxChunkSizeBytes(10) .build(), - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent due to repeated packet + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // Sent due to repeated packet .setOffset(100) .setWindowEndOffset(150) .setMaxChunkSizeBytes(10) .build(), - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent due to repeated packet + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // Sent due to repeated packet .setOffset(100) .setWindowEndOffset(150) .setMaxChunkSizeBytes(10) .build(), // Window 5: final packet and closing handshake - newChunk(Chunk.Type.COMPLETION, 99).setStatus(Status.OK.ordinal()).build(), - newChunk(Chunk.Type.COMPLETION, 99).setStatus(Status.OK.ordinal()).build()); + newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.ordinal()).build(), + newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.ordinal()).build()); } @Test @@ -1306,21 +1331,31 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS, progressCallback); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - performReadStartHandshake(123, 123, TRANSFER_PARAMETERS); + performReadStartHandshake(transfer); - receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)), - newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)), - newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 60)).setRemainingBytes(5), - newChunk(Chunk.Type.DATA, 123).setOffset(60).setData(range(60, 70)), - newChunk(Chunk.Type.DATA, 123).setOffset(70).setData(range(70, 80)).setRemainingBytes(20), - newChunk(Chunk.Type.DATA, 123).setOffset(90).setData(range(90, 100)), - newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30))); + receiveReadChunks( + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(0).setData(range(0, 30)), + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(30).setData(range(30, 50)), + newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(50) + .setData(range(50, 60)) + .setRemainingBytes(5), + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(60).setData(range(60, 70)), + newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(70) + .setData(range(70, 80)) + .setRemainingBytes(20), + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(90).setData(range(90, 100)), + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(0).setData(range(0, 30))); lastChunks(); // Discard chunks; no need to inspect for this test - receiveReadChunks( - newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); - performReadCompletionHandshake(123, Status.OK); + receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0)); + performReadCompletionHandshake(transfer.getSessionId(), Status.OK); verify(progressCallback, times(6)).accept(progress.capture()); assertThat(progress.getAllValues()) @@ -1338,51 +1373,59 @@ public final class TransferClientTest { public void read_rewindWhenPacketsSkipped() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - performReadStartHandshake(123, 123, TRANSFER_PARAMETERS); + performReadStartHandshake(transfer); - receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(30, 50))); + receiveReadChunks( + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(50).setData(range(30, 50))); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) .setOffset(0) .build()); - receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)), - newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50))); + receiveReadChunks( + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(0).setData(range(0, 30)), + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(30).setData(range(30, 50))); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId()) .setOffset(30) .setWindowEndOffset(80) .setMaxChunkSizeBytes(30) .build()); - receiveReadChunks( - newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(50) .setWindowEndOffset(100) .setMaxChunkSizeBytes(30) .build()); - receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80))); + receiveReadChunks( + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(50).setData(range(50, 80))); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123) + .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId()) .setOffset(80) .setWindowEndOffset(130) .setMaxChunkSizeBytes(30) .build()); - receiveReadChunks( - newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0)); + receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(80) + .setData(range(80, 100)) + .setRemainingBytes(0)); - performReadCompletionHandshake(123, Status.OK); + performReadCompletionHandshake(transfer.getSessionId(), Status.OK); assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray()); } @@ -1392,15 +1435,16 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); for (int i = 0; i < 3; ++i) { ListenableFuture<byte[]> future = transferClient.read(1); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - performReadStartHandshake(1, 100 + i); + performReadStartHandshake(transfer); - receiveReadChunks(newChunk(Chunk.Type.DATA, 100 + i) + receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId()) .setOffset(0) .setData(TEST_DATA_SHORT) .setRemainingBytes(0)); - performReadCompletionHandshake(100 + i, Status.OK); + performReadCompletionHandshake(transfer.getSessionId(), Status.OK); assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray()); } @@ -1435,15 +1479,18 @@ public final class TransferClientTest { public void read_sendErrorOnLaterPacket_aborts() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(1024, TRANSFER_PARAMETERS); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - performReadStartHandshake(1024, 123, TRANSFER_PARAMETERS); + performReadStartHandshake(transfer); - receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20))); + receiveReadChunks( + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(0).setData(range(0, 20))); ChannelOutputException exception = new ChannelOutputException("blah"); rpcClient.setChannelOutputException(exception); - receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 50))); + receiveReadChunks( + newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(20).setData(range(20, 50))); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -1454,21 +1501,22 @@ public final class TransferClientTest { public void read_cancelFuture_abortsTransfer() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - performReadStartHandshake(1, 123, TRANSFER_PARAMETERS); + performReadStartHandshake(transfer); assertThat(future.cancel(true)).isTrue(); - assertThat(lastChunks()).contains(finalChunk(123, Status.CANCELLED)); + assertThat(lastChunks()).contains(finalChunk(transfer.getSessionId(), Status.CANCELLED)); } @Test public void read_immediateTransferProtocolError_aborts() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(123); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - // Resource ID will be set since session ID hasn't been assigned yet. - receiveReadChunks(newChunk(Chunk.Type.COMPLETION, VersionedChunk.UNASSIGNED_SESSION_ID) + receiveReadChunks(newChunk(Chunk.Type.COMPLETION, transfer.getSessionId()) .setResourceId(123) .setStatus(Status.ALREADY_EXISTS.ordinal())); @@ -1482,10 +1530,11 @@ public final class TransferClientTest { public void read_laterTransferProtocolError_aborts() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(123); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - performReadStartHandshake(123, 514); + performReadStartHandshake(transfer); - receiveReadChunks(finalChunk(514, Status.ALREADY_EXISTS)); + receiveReadChunks(finalChunk(transfer.getSessionId(), Status.ALREADY_EXISTS)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -1508,14 +1557,16 @@ public final class TransferClientTest { public void read_serverRespondsWithUnknownVersion_invalidArgument() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(2, TRANSFER_PARAMETERS); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); - assertThat(lastChunks()) - .containsExactly(initialReadChunk(2, ProtocolVersion.VERSION_TWO, TRANSFER_PARAMETERS)); + assertThat(lastChunks()).containsExactly(initialReadChunk(transfer)); - receiveReadChunks( - newChunk(Chunk.Type.START_ACK, 99).setResourceId(2).setProtocolVersion(600613)); + receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) + .setResourceId(2) + .setProtocolVersion(600613)); - assertThat(lastChunks()).containsExactly(finalChunk(99, Status.INVALID_ARGUMENT)); + assertThat(lastChunks()) + .containsExactly(finalChunk(transfer.getSessionId(), Status.INVALID_ARGUMENT)); ExecutionException exception = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); @@ -1525,6 +1576,7 @@ public final class TransferClientTest { public void read_timeout() { createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS); + ReadTransfer transfer = transferClient.getReadTransferForTest(future); // Call future.get() without sending any server-side packets. ExecutionException exception = assertThrows(ExecutionException.class, future::get); @@ -1532,45 +1584,65 @@ public final class TransferClientTest { // read should have retried sending the transfer parameters 2 times, for a total of 3 assertThat(lastChunks()) - .containsExactly(initialReadChunk(123, ProtocolVersion.VERSION_TWO), - initialReadChunk(123, ProtocolVersion.VERSION_TWO), - initialReadChunk(123, ProtocolVersion.VERSION_TWO)); + .containsExactly( + initialReadChunk(transfer), initialReadChunk(transfer), initialReadChunk(transfer)); + } + + @Test + public void read_generatesUniqueSessionIds() { + createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); + + int sessionId1 = transferClient.getNextSessionIdForTest(); + ReadTransfer transfer1 = transferClient.getReadTransferForTest(transferClient.read(123)); + assertThat(sessionId1).isEqualTo(transfer1.getSessionId()); + + int sessionId2 = transferClient.getNextSessionIdForTest(); + ReadTransfer transfer2 = transferClient.getReadTransferForTest(transferClient.read(456)); + assertThat(sessionId2).isEqualTo(transfer2.getSessionId()); + + int sessionId3 = transferClient.getNextSessionIdForTest(); + ReadTransfer transfer3 = transferClient.getReadTransferForTest(transferClient.read(789)); + assertThat(sessionId3).isEqualTo(transfer3.getSessionId()); + + assertThat(sessionId1).isNotEqualTo(sessionId2); + assertThat(sessionId1).isNotEqualTo(sessionId3); } @Test public void write_singleChunk() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); // Do the start handshake (equivalent to performWriteStartHandshake()). - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); + assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size())); - receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 123) + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) .setResourceId(2) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) + .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, transfer.getSessionId()) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) .setRemainingBytes(TEST_DATA_SHORT.size()) .build()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(0) .setWindowEndOffset(1024) .setMaxChunkSizeBytes(128)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, 123) + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) .setOffset(0) .setData(TEST_DATA_SHORT) .setRemainingBytes(0) .build()); - receiveWriteChunks(finalChunk(123, Status.OK)); + receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.OK)); - assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build()); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -1579,9 +1651,9 @@ public final class TransferClientTest { public void write_requestV2ReceiveLegacy() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); + assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size())); receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2) .setOffset(0) @@ -1596,10 +1668,11 @@ public final class TransferClientTest { public void write_platformTransferDisabled_aborted() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); assertThat(future.isDone()).isFalse(); shouldAbortFlag = true; - receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 3).setResourceId(2)); + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()).setResourceId(2)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -1610,23 +1683,21 @@ public final class TransferClientTest { public void write_failedPreconditionError_retriesInitialPacket() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); + assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size())); for (int i = 0; i < MAX_RETRIES; ++i) { receiveWriteServerError(Status.FAILED_PRECONDITION); - assertThat(lastChunks()) - .containsExactly( - initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); + assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size())); } - receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 54321) + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) .setResourceId(2) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, 54321) + .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, transfer.getSessionId()) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) .setRemainingBytes(TEST_DATA_SHORT.size()) .build()); @@ -1636,11 +1707,11 @@ public final class TransferClientTest { public void write_failedPreconditionError_abortsAfterInitialPacket() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size())); + assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, TEST_DATA_100B.size())); - receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 4) + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) .setResourceId(2) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); @@ -1655,12 +1726,13 @@ public final class TransferClientTest { public void write_failedPreconditionErrorMaxRetriesTimes_aborts() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); for (int i = 0; i < MAX_RETRIES; ++i) { receiveWriteServerError(Status.FAILED_PRECONDITION); } - Chunk initialChunk = initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()); + Chunk initialChunk = initialWriteChunk(transfer, TEST_DATA_SHORT.size()); assertThat(lastChunks()) .containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk)); @@ -1677,12 +1749,14 @@ public final class TransferClientTest { public void write_empty() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, new byte[] {}); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(2, 123, 0); + performWriteStartHandshake(transfer, 0); - receiveWriteChunks(finalChunk(123, Status.OK)); + receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.OK)); - assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build()); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -1691,34 +1765,47 @@ public final class TransferClientTest { public void write_severalChunks() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(500, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(500, 123, TEST_DATA_100B.size()); + performWriteStartHandshake(transfer, TEST_DATA_100B.size()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(0) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(), - newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build()); + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(0) + .setData(range(0, 30)) + .build(), + newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(30) + .setData(range(30, 50)) + .build()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(50) .setWindowEndOffset(90) .setMaxChunkSizeBytes(25)); assertThat(lastChunks()) - .containsExactly( - newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 75)).build(), - newChunk(Chunk.Type.DATA, 123).setOffset(75).setData(range(75, 90)).build()); + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(50) + .setData(range(50, 75)) + .build(), + newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(75) + .setData(range(75, 90)) + .build()); - receiveWriteChunks( - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(90).setWindowEndOffset(140)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) + .setOffset(90) + .setWindowEndOffset(140)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, 123) + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) .setOffset(90) .setData(range(90, 100)) .setRemainingBytes(0) @@ -1726,9 +1813,10 @@ public final class TransferClientTest { assertThat(future.isDone()).isFalse(); - receiveWriteChunks(finalChunk(123, Status.OK)); + receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.OK)); - assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build()); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -1737,32 +1825,43 @@ public final class TransferClientTest { public void write_parametersContinue() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(321, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(321, 123, TEST_DATA_100B.size()); + performWriteStartHandshake(transfer, TEST_DATA_100B.size()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(0) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30) .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(), - newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build()); + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(0) + .setData(range(0, 30)) + .build(), + newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(30) + .setData(range(30, 50)) + .build()); - receiveWriteChunks( - newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(30).setWindowEndOffset(80)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId()) + .setOffset(30) + .setWindowEndOffset(80)); // Transfer doesn't roll back to offset 30 but instead continues sending up to 80. assertThat(lastChunks()) - .containsExactly( - newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)).build()); + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(50) + .setData(range(50, 80)) + .build()); - receiveWriteChunks( - newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(80).setWindowEndOffset(130)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId()) + .setOffset(80) + .setWindowEndOffset(130)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, 123) + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) .setOffset(80) .setData(range(80, 100)) .setRemainingBytes(0) @@ -1770,9 +1869,10 @@ public final class TransferClientTest { assertThat(future.isDone()).isFalse(); - receiveWriteChunks(finalChunk(123, Status.OK)); + receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.OK)); - assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build()); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -1781,33 +1881,42 @@ public final class TransferClientTest { public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(123, 555, TEST_DATA_100B.size()); + performWriteStartHandshake(transfer, TEST_DATA_100B.size()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 555) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(0) .setWindowEndOffset(90) .setMaxChunkSizeBytes(90) .setMinDelayMicroseconds(1)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, 555).setOffset(0).setData(range(0, 90)).build()); + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(0) + .setData(range(0, 90)) + .build()); receiveWriteChunks( // This stale packet with a window end before the offset should be ignored. - newChunk(Chunk.Type.PARAMETERS_CONTINUE, 555).setOffset(25).setWindowEndOffset(50), + newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId()) + .setOffset(25) + .setWindowEndOffset(50), // Start from an arbitrary offset before the current, but extend the window to the end. - newChunk(Chunk.Type.PARAMETERS_CONTINUE, 555).setOffset(80).setWindowEndOffset(100)); + newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId()) + .setOffset(80) + .setWindowEndOffset(100)); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.DATA, 555) + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) .setOffset(90) .setData(range(90, 100)) .setRemainingBytes(0) .build()); - receiveWriteChunks(finalChunk(555, Status.OK)); - assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 555).build()); + receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.OK)); + assertThat(lastChunks()) + .containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build()); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. } @@ -1817,15 +1926,18 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray(), progressCallback); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(123, 123, TEST_DATA_100B.size()); + performWriteStartHandshake(transfer, TEST_DATA_100B.size()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(0) .setWindowEndOffset(90) .setMaxChunkSizeBytes(30), - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(50).setWindowEndOffset(100), - finalChunk(123, Status.OK)); + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) + .setOffset(50) + .setWindowEndOffset(100), + finalChunk(transfer.getSessionId(), Status.OK)); verify(progressCallback, times(6)).accept(progress.capture()); assertThat(progress.getAllValues()) @@ -1842,17 +1954,20 @@ public final class TransferClientTest { public void write_asksForFinalOffset_sendsFinalPacket() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(123, 456, TEST_DATA_100B.size()); + performWriteStartHandshake(transfer, TEST_DATA_100B.size()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 456) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(100) .setWindowEndOffset(140) .setMaxChunkSizeBytes(25)); assertThat(lastChunks()) - .containsExactly( - newChunk(Chunk.Type.DATA, 456).setOffset(100).setRemainingBytes(0).build()); + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setOffset(100) + .setRemainingBytes(0) + .build()); } @Test @@ -1860,17 +1975,21 @@ public final class TransferClientTest { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); for (int i = 0; i < 3; ++i) { ListenableFuture<Void> future = transferClient.write(6, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(6, 123, TEST_DATA_SHORT.size()); + performWriteStartHandshake(transfer, TEST_DATA_SHORT.size()); - receiveWriteChunks( - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0).setWindowEndOffset(50), - finalChunk(123, Status.OK)); + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) + .setOffset(0) + .setWindowEndOffset(50), + finalChunk(transfer.getSessionId(), Status.OK)); assertThat(lastChunks()) - .containsExactly( - newChunk(Chunk.Type.DATA, 123).setData(TEST_DATA_SHORT).setRemainingBytes(0).build(), - newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId()) + .setData(TEST_DATA_SHORT) + .setRemainingBytes(0) + .build(), + newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build()); future.get(); } @@ -1905,13 +2024,16 @@ public final class TransferClientTest { public void write_serviceRequestsNoData_aborts() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(7, 123, TEST_DATA_SHORT.size()); + performWriteStartHandshake(transfer, TEST_DATA_SHORT.size()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0)); + receiveWriteChunks( + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()).setOffset(0)); - assertThat(lastChunks()).containsExactly(finalChunk(123, Status.INVALID_ARGUMENT)); - receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123)); + assertThat(lastChunks()) + .containsExactly(finalChunk(transfer.getSessionId(), Status.INVALID_ARGUMENT)); + receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId())); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); @@ -1921,16 +2043,18 @@ public final class TransferClientTest { public void write_invalidOffset_aborts() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(7, 123, TEST_DATA_100B.size()); + performWriteStartHandshake(transfer, TEST_DATA_100B.size()); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(101) .setWindowEndOffset(141) .setMaxChunkSizeBytes(25)); - assertThat(lastChunks()).containsExactly(finalChunk(123, Status.OUT_OF_RANGE)); - receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123)); + assertThat(lastChunks()) + .containsExactly(finalChunk(transfer.getSessionId(), Status.OUT_OF_RANGE)); + receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId())); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE); @@ -1940,13 +2064,14 @@ public final class TransferClientTest { public void write_sendErrorOnLaterPacket_aborts() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(7, 123, TEST_DATA_SHORT.size()); + performWriteStartHandshake(transfer, TEST_DATA_SHORT.size()); ChannelOutputException exception = new ChannelOutputException("blah"); rpcClient.setChannelOutputException(exception); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(0) .setWindowEndOffset(50) .setMaxChunkSizeBytes(30)); @@ -1960,27 +2085,29 @@ public final class TransferClientTest { public void write_cancelFuture_abortsTransfer() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(7, 123, TEST_DATA_100B.size()); + performWriteStartHandshake(transfer, TEST_DATA_100B.size()); assertThat(future.cancel(true)).isTrue(); assertThat(future.isCancelled()).isTrue(); - receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()) .setOffset(0) .setWindowEndOffset(50) .setMaxChunkSizeBytes(50)); - assertThat(lastChunks()).contains(finalChunk(123, Status.CANCELLED)); - receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123)); + assertThat(lastChunks()).contains(finalChunk(transfer.getSessionId(), Status.CANCELLED)); + receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId())); } @Test public void write_immediateTransferProtocolError_aborts() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - receiveWriteChunks(newChunk(Chunk.Type.COMPLETION, VersionedChunk.UNASSIGNED_SESSION_ID) + receiveWriteChunks(newChunk(Chunk.Type.COMPLETION, transfer.getSessionId()) .setResourceId(123) .setStatus(Status.NOT_FOUND.ordinal())); @@ -1994,10 +2121,11 @@ public final class TransferClientTest { public void write_laterTransferProtocolError_aborts() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - performWriteStartHandshake(123, 123, TEST_DATA_SHORT.size()); + performWriteStartHandshake(transfer, TEST_DATA_SHORT.size()); - receiveWriteChunks(finalChunk(123, Status.NOT_FOUND)); + receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.NOT_FOUND)); ExecutionException thrown = assertThrows(ExecutionException.class, future::get); assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class); @@ -2020,29 +2148,34 @@ public final class TransferClientTest { public void write_unknownVersion_invalidArgument() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 3).setResourceId(2).setProtocolVersion(9)); + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) + .setResourceId(2) + .setProtocolVersion(9)); ExecutionException exception = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); assertThat(lastChunks()) - .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), - finalChunk(3, Status.INVALID_ARGUMENT)); + .containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size()), + finalChunk(transfer.getSessionId(), Status.INVALID_ARGUMENT)); } @Test public void write_serverRespondsWithUnknownVersion_invalidArgument() { createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, 100)); + assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, 100)); - receiveWriteChunks( - newChunk(Chunk.Type.START_ACK, 99).setResourceId(2).setProtocolVersion(600613)); + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) + .setResourceId(2) + .setProtocolVersion(600613)); - assertThat(lastChunks()).containsExactly(finalChunk(99, Status.INVALID_ARGUMENT)); + assertThat(lastChunks()) + .containsExactly(finalChunk(transfer.getSessionId(), Status.INVALID_ARGUMENT)); ExecutionException exception = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT); @@ -2052,6 +2185,7 @@ public final class TransferClientTest { public void write_timeoutAfterInitialChunk() { createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); // Call future.get() without sending any server-side packets. ExecutionException exception = assertThrows(ExecutionException.class, future::get); @@ -2059,10 +2193,9 @@ public final class TransferClientTest { // Client should have resent the last chunk (the initial chunk in this case) for each timeout. assertThat(lastChunks()) - .containsExactly( - initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // initial - initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // retry 1 - initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); // retry 2 + .containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size()), // initial + initialWriteChunk(transfer, TEST_DATA_SHORT.size()), // retry 1 + initialWriteChunk(transfer, TEST_DATA_SHORT.size())); // retry 2 } @Test @@ -2071,25 +2204,25 @@ public final class TransferClientTest { // Wait for two outgoing packets (Write RPC request and first chunk), then do the handshake. enqueueWriteChunks(2, - newChunk(Chunk.Type.START_ACK, 123).setResourceId(9), - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + newChunk(Chunk.Type.START_ACK, transferClient.getNextSessionIdForTest()).setResourceId(9), + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transferClient.getNextSessionIdForTest()) .setOffset(0) .setWindowEndOffset(90) .setMaxChunkSizeBytes(30)); ListenableFuture<Void> future = transferClient.write(9, TEST_DATA_SHORT.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); ExecutionException exception = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); - Chunk data = newChunk(Chunk.Type.DATA, 123) + Chunk data = newChunk(Chunk.Type.DATA, transfer.getSessionId()) .setOffset(0) .setData(TEST_DATA_SHORT) .setRemainingBytes(0) .build(); assertThat(lastChunks()) - .containsExactly( - initialWriteChunk(9, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // initial - newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) + .containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size()), // initial + newChunk(Chunk.Type.START_ACK_CONFIRMATION, transfer.getSessionId()) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) .setRemainingBytes(TEST_DATA_SHORT.size()) .build(), @@ -2103,47 +2236,50 @@ public final class TransferClientTest { createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries + int id = transferClient.getNextSessionIdForTest(); + // Wait for four outgoing packets (Write RPC request and START chunk + retry), then handshake. enqueueWriteChunks(3, - newChunk(Chunk.Type.START_ACK, 123) + newChunk(Chunk.Type.START_ACK, id) .setResourceId(5) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); // Wait for start ack confirmation + 2 retries, then request three packets. enqueueWriteChunks(3, - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) .setOffset(0) .setWindowEndOffset(60) .setMaxChunkSizeBytes(20)); // After two packets, request the remainder of the packets. enqueueWriteChunks( - 2, newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(20).setWindowEndOffset(200)); + 2, newChunk(Chunk.Type.PARAMETERS_CONTINUE, id).setOffset(20).setWindowEndOffset(200)); // Wait for last 3 data packets, then 2 final packet retries. enqueueWriteChunks(5, - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) .setOffset(80) .setWindowEndOffset(200) .setMaxChunkSizeBytes(20), - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) .setOffset(80) .setWindowEndOffset(200) .setMaxChunkSizeBytes(20)); // After the retry, confirm completed multiple times; additional packets should be dropped enqueueWriteChunks(1, - newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()), - newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()), - newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()), - newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code())); + newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.code()), + newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.code()), + newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.code()), + newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.code())); ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. final Chunk startAckConfirmation = - newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) + newChunk(Chunk.Type.START_ACK_CONFIRMATION, id) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) .setRemainingBytes(TEST_DATA_100B.size()) .build(); @@ -2151,25 +2287,25 @@ public final class TransferClientTest { assertThat(lastChunks()) .containsExactly( // initial handshake with retries - initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), - initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), + initialWriteChunk(transfer, TEST_DATA_100B.size()), + initialWriteChunk(transfer, TEST_DATA_100B.size()), startAckConfirmation, startAckConfirmation, startAckConfirmation, // send all data - dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 - dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 - dataChunk(123, TEST_DATA_100B, 40, 60), // data 40-60 - dataChunk(123, TEST_DATA_100B, 60, 80), // data 60-80 - dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) + dataChunk(id, TEST_DATA_100B, 0, 20), // data 0-20 + dataChunk(id, TEST_DATA_100B, 20, 40), // data 20-40 + dataChunk(id, TEST_DATA_100B, 40, 60), // data 40-60 + dataChunk(id, TEST_DATA_100B, 60, 80), // data 60-80 + dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 (final) // retry last packet two times - dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) - dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) + dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 (final) + dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 (final) // respond to two PARAMETERS_RETRANSMIT packets - dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) - dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final) + dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 (final) + dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 (final) // respond to OK packet - newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + newChunk(Chunk.Type.COMPLETION_ACK, id).build()); } @Test @@ -2177,98 +2313,105 @@ public final class TransferClientTest { createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO); assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries + int id = transferClient.getNextSessionIdForTest(); + // Wait for two outgoing packets (Write RPC request and START chunk), then do the handshake. enqueueWriteChunks(2, - newChunk(Chunk.Type.START_ACK, 123) + newChunk(Chunk.Type.START_ACK, id) .setResourceId(5) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); // Request two packets. enqueueWriteChunks(1, - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) .setOffset(0) .setWindowEndOffset(40) .setMaxChunkSizeBytes(20)); // After the second retry, send more transfer parameters enqueueWriteChunks(4, - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) .setOffset(40) .setWindowEndOffset(120) .setMaxChunkSizeBytes(40)); // After the first retry, send more transfer parameters enqueueWriteChunks(3, - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) .setOffset(80) .setWindowEndOffset(160) .setMaxChunkSizeBytes(10)); // After the second retry, confirm completed - enqueueWriteChunks(4, newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code())); - enqueueWriteChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, 123)); + enqueueWriteChunks(4, newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.code())); + enqueueWriteChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, id)); ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown. assertThat(lastChunks()) .containsExactly( // initial handshake - initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), - newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) + initialWriteChunk(transfer, TEST_DATA_100B.size()), + newChunk(Chunk.Type.START_ACK_CONFIRMATION, id) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) .setRemainingBytes(TEST_DATA_100B.size()) .build(), // after 2, receive parameters: 40 from 0 by 20 - dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 - dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 - dataChunk(123, TEST_DATA_100B, 20, 40), // retry 1 - dataChunk(123, TEST_DATA_100B, 20, 40), // retry 2 + dataChunk(id, TEST_DATA_100B, 0, 20), // data 0-20 + dataChunk(id, TEST_DATA_100B, 20, 40), // data 20-40 + dataChunk(id, TEST_DATA_100B, 20, 40), // retry 1 + dataChunk(id, TEST_DATA_100B, 20, 40), // retry 2 // after 4, receive parameters: 80 from 40 by 40 - dataChunk(123, TEST_DATA_100B, 40, 80), // data 40-80 - dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 - dataChunk(123, TEST_DATA_100B, 80, 100), // retry 1 + dataChunk(id, TEST_DATA_100B, 40, 80), // data 40-80 + dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 + dataChunk(id, TEST_DATA_100B, 80, 100), // retry 1 // after 3, receive parameters: 80 from 80 by 10 - dataChunk(123, TEST_DATA_100B, 80, 90), // data 80-90 - dataChunk(123, TEST_DATA_100B, 90, 100), // data 90-100 - dataChunk(123, TEST_DATA_100B, 90, 100), // retry 1 - dataChunk(123, TEST_DATA_100B, 90, 100), // retry 2 + dataChunk(id, TEST_DATA_100B, 80, 90), // data 80-90 + dataChunk(id, TEST_DATA_100B, 90, 100), // data 90-100 + dataChunk(id, TEST_DATA_100B, 90, 100), // retry 1 + dataChunk(id, TEST_DATA_100B, 90, 100), // retry 2 // after 4, receive final OK - newChunk(Chunk.Type.COMPLETION_ACK, 123).build()); + newChunk(Chunk.Type.COMPLETION_ACK, id).build()); } + @Test - public void write_maxLifetimeRetries() throws Exception { + public void write_maxLifetimeRetries() { createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO, 5); assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries + int id = transferClient.getNextSessionIdForTest(); + // Wait for four outgoing packets (Write RPC request and START chunk + 2 retries) enqueueWriteChunks(4, // 2 retries - newChunk(Chunk.Type.START_ACK, 123) + newChunk(Chunk.Type.START_ACK, id) .setResourceId(5) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); // Wait for start ack confirmation + 2 retries, then request three packets. enqueueWriteChunks(3, // 2 retries - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) .setOffset(0) .setWindowEndOffset(60) .setMaxChunkSizeBytes(20)); // After 3 data packets, wait for two more retries, which should put this over the retry limit. enqueueWriteChunks(5, // 2 retries - newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) // This packet should be ignored + newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // This packet should be ignored .setOffset(80) .setWindowEndOffset(200) .setMaxChunkSizeBytes(20)); ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray()); + WriteTransfer transfer = transferClient.getWriteTransferForTest(future); ExecutionException exception = assertThrows(ExecutionException.class, future::get); assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED); final Chunk startAckConfirmation = - newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123) + newChunk(Chunk.Type.START_ACK_CONFIRMATION, id) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) .setRemainingBytes(TEST_DATA_100B.size()) .build(); @@ -2276,19 +2419,19 @@ public final class TransferClientTest { assertThat(lastChunks()) .containsExactly( // initial chunk and 2 retries - initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), - initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), - initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()), + initialWriteChunk(transfer, TEST_DATA_100B.size()), + initialWriteChunk(transfer, TEST_DATA_100B.size()), + initialWriteChunk(transfer, TEST_DATA_100B.size()), // START_ACK_CONFIRMATION and 2 retries startAckConfirmation, startAckConfirmation, startAckConfirmation, // send all data - dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20 - dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40 - dataChunk(123, TEST_DATA_100B, 40, 60), // data 40-60 + dataChunk(id, TEST_DATA_100B, 0, 20), // data 0-20 + dataChunk(id, TEST_DATA_100B, 20, 40), // data 20-40 + dataChunk(id, TEST_DATA_100B, 40, 60), // data 40-60 // last packet retry, then hit the lifetime retry limit and abort - dataChunk(123, TEST_DATA_100B, 40, 60)); // data 40-60 + dataChunk(id, TEST_DATA_100B, 40, 60)); // data 40-60 } private static ByteString range(int startInclusive, int endExclusive) { @@ -2310,21 +2453,35 @@ public final class TransferClientTest { return Chunk.newBuilder().setType(type).setSessionId(sessionId); } - private static Chunk initialReadChunk(int resourceId, ProtocolVersion version) { - return initialReadChunk(resourceId, version, TRANSFER_PARAMETERS); + private static Chunk initialReadChunk(ReadTransfer transfer) { + Chunk.Builder chunk = + newLegacyChunk(Chunk.Type.START, transfer.getResourceId()) + .setResourceId(transfer.getResourceId()) + .setPendingBytes(transfer.getParametersForTest().maxPendingBytes()) + .setWindowEndOffset(transfer.getParametersForTest().maxPendingBytes()) + .setMaxChunkSizeBytes(transfer.getParametersForTest().maxChunkSizeBytes()) + .setOffset(0); + if (transfer.getDesiredProtocolVersionForTest() != ProtocolVersion.LEGACY) { + chunk.setProtocolVersion(transfer.getDesiredProtocolVersionForTest().ordinal()); + chunk.setDesiredSessionId(transfer.getSessionId()); + } + if (transfer.getParametersForTest().chunkDelayMicroseconds() > 0) { + chunk.setMinDelayMicroseconds(transfer.getParametersForTest().chunkDelayMicroseconds()); + } + return chunk.build(); } - private static Chunk initialReadChunk( - int resourceId, ProtocolVersion version, TransferParameters params) { + private static Chunk initialLegacyReadChunk(int resourceId) { + return initialLegacyReadChunk(resourceId, TRANSFER_PARAMETERS); + } + + private static Chunk initialLegacyReadChunk(int resourceId, TransferParameters params) { Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, resourceId) .setResourceId(resourceId) .setPendingBytes(params.maxPendingBytes()) .setWindowEndOffset(params.maxPendingBytes()) .setMaxChunkSizeBytes(params.maxChunkSizeBytes()) .setOffset(0); - if (version != ProtocolVersion.LEGACY) { - chunk.setProtocolVersion(version.ordinal()); - } if (params.chunkDelayMicroseconds() > 0) { chunk.setMinDelayMicroseconds(params.chunkDelayMicroseconds()); } @@ -2343,12 +2500,20 @@ public final class TransferClientTest { return chunk.build(); } - private static Chunk initialWriteChunk(int resourceId, ProtocolVersion version, int size) { - Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, resourceId) - .setResourceId(resourceId) + private static Chunk initialLegacyWriteChunk(int resourceId, int size) { + return newLegacyChunk(Chunk.Type.START, resourceId) + .setResourceId(resourceId) + .setRemainingBytes(size) + .build(); + } + + private static Chunk initialWriteChunk(WriteTransfer transfer, int size) { + Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, transfer.getResourceId()) + .setResourceId(transfer.getResourceId()) .setRemainingBytes(size); - if (version != ProtocolVersion.LEGACY) { + if (transfer.getDesiredProtocolVersionForTest() != ProtocolVersion.LEGACY) { chunk.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()); + chunk.setDesiredSessionId(transfer.getSessionId()); } return chunk.build(); } @@ -2415,20 +2580,16 @@ public final class TransferClientTest { } } - private void performReadStartHandshake(int resourceId, int sessionId) { - performReadStartHandshake( - resourceId, sessionId, TransferClient.DEFAULT_READ_TRANSFER_PARAMETERS); - } + private void performReadStartHandshake(ReadTransfer transfer) { + assertThat(lastChunks()).containsExactly(initialReadChunk(transfer)); - private void performReadStartHandshake(int resourceId, int sessionId, TransferParameters params) { - assertThat(lastChunks()) - .containsExactly(initialReadChunk(resourceId, ProtocolVersion.VERSION_TWO, params)); - - receiveReadChunks(newChunk(Chunk.Type.START_ACK, sessionId) - .setResourceId(resourceId) + receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) + .setResourceId(transfer.getResourceId()) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); - assertThat(lastChunks()).containsExactly(readStartAckConfirmation(sessionId, params)); + assertThat(lastChunks()) + .containsExactly( + readStartAckConfirmation(transfer.getSessionId(), transfer.getParametersForTest())); } private void performReadCompletionHandshake(int sessionId, Status status) { @@ -2442,16 +2603,15 @@ public final class TransferClientTest { receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, sessionId)); } - private void performWriteStartHandshake(int resourceId, int sessionId, int dataSize) { - assertThat(lastChunks()) - .containsExactly(initialWriteChunk(resourceId, ProtocolVersion.VERSION_TWO, dataSize)); + private void performWriteStartHandshake(WriteTransfer transfer, int dataSize) { + assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, dataSize)); - receiveWriteChunks(newChunk(Chunk.Type.START_ACK, sessionId) - .setResourceId(resourceId) + receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()) + .setResourceId(transfer.getResourceId()) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())); assertThat(lastChunks()) - .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, sessionId) + .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, transfer.getSessionId()) .setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()) .setRemainingBytes(dataSize) .build()); |